costin      02/03/20 15:38:51

  Added:       jk/native2/common jk_worker_ajp13.c
  Removed:     jk/native2/common jk_ajp14_worker.c
  Log:
  Renamed the ajp worker to follow the rest of the code naming scheme.
  
  Also reverted back to ajp13 - all 14 extensions will consist on
  additional headers, the protocol is backward compatible.
  
  Revision  Changes    Path
  1.1                  jakarta-tomcat-connectors/jk/native2/common/jk_worker_ajp13.c
  
  Index: jk_worker_ajp13.c
  ===================================================================
  /* ========================================================================= *
   *                                                                           *
   *                 The Apache Software License,  Version 1.1                 *
   *                                                                           *
   *          Copyright (c) 1999-2001 The Apache Software Foundation.          *
   *                           All rights reserved.                            *
   *                                                                           *
   * ========================================================================= *
   *                                                                           *
   * Redistribution and use in source and binary forms,  with or without modi- *
   * fication, are permitted provided that the following conditions are met:   *
   *                                                                           *
   * 1. Redistributions of source code  must retain the above copyright notice *
   *    notice, this list of conditions and the following disclaimer.          *
   *                                                                           *
   * 2. Redistributions  in binary  form  must  reproduce the  above copyright *
   *    notice,  this list of conditions  and the following  disclaimer in the *
   *    documentation and/or other materials provided with the distribution.   *
   *                                                                           *
   * 3. The end-user documentation  included with the redistribution,  if any, *
   *    must include the following acknowlegement:                             *
   *                                                                           *
   *       "This product includes  software developed  by the Apache  Software *
   *        Foundation <http://www.apache.org/>."                              *
   *                                                                           *
   *    Alternately, this acknowlegement may appear in the software itself, if *
   *    and wherever such third-party acknowlegements normally appear.         *
   *                                                                           *
   * 4. The names  "The  Jakarta  Project",  "Jk",  and  "Apache  Software     *
   *    Foundation"  must not be used  to endorse or promote  products derived *
   *    from this  software without  prior  written  permission.  For  written *
   *    permission, please contact <[EMAIL PROTECTED]>.                        *
   *                                                                           *
   * 5. Products derived from this software may not be called "Apache" nor may *
   *    "Apache" appear in their names without prior written permission of the *
   *    Apache Software Foundation.                                            *
   *                                                                           *
   * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED WARRANTIES *
   * INCLUDING, BUT NOT LIMITED TO,  THE IMPLIED WARRANTIES OF MERCHANTABILITY *
   * AND FITNESS FOR  A PARTICULAR PURPOSE  ARE DISCLAIMED.  IN NO EVENT SHALL *
   * THE APACHE  SOFTWARE  FOUNDATION OR  ITS CONTRIBUTORS  BE LIABLE  FOR ANY *
   * DIRECT,  INDIRECT,   INCIDENTAL,  SPECIAL,  EXEMPLARY,  OR  CONSEQUENTIAL *
   * DAMAGES (INCLUDING,  BUT NOT LIMITED TO,  PROCUREMENT OF SUBSTITUTE GOODS *
   * OR SERVICES;  LOSS OF USE,  DATA,  OR PROFITS;  OR BUSINESS INTERRUPTION) *
   * HOWEVER CAUSED AND  ON ANY  THEORY  OF  LIABILITY,  WHETHER IN  CONTRACT, *
   * STRICT LIABILITY, OR TORT  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN *
   * ANY  WAY  OUT OF  THE  USE OF  THIS  SOFTWARE,  EVEN  IF  ADVISED  OF THE *
   * POSSIBILITY OF SUCH DAMAGE.                                               *
   *                                                                           *
   * ========================================================================= *
   *                                                                           *
   * This software  consists of voluntary  contributions made  by many indivi- *
   * duals on behalf of the  Apache Software Foundation.  For more information *
   * on the Apache Software Foundation, please see <http://www.apache.org/>.   *
   *                                                                           *
   * ========================================================================= */
  
  /**
   * Description: AJP14 next generation Bi-directional protocol.
   *              Backward compatible with Ajp13
   * Author:      Henri Gomez <[EMAIL PROTECTED]>
   * Author:      Costin <[EMAIL PROTECTED]>                              
   * Author:      Gal Shachor <[EMAIL PROTECTED]>                           
   */
  
  #include "jk_global.h"
  #include "jk_pool.h"
  #include "jk_channel.h"
  #include "jk_msg.h"
  #include "jk_logger.h"
  #include "jk_handler.h"
  #include "jk_service.h"
  #include "jk_env.h"
  #include "jk_objCache.h"
  #include "jk_registry.h"
  
  
  #define AJP_DEF_RETRY_ATTEMPTS    (2)
  #define AJP14_PROTO 14
  #define AJP13_PROTO 13
  
  #define AJP13_DEF_HOST  ("localhost")
  #define AJP13_DEF_PORT  (8009)
  #define AJP14_DEF_HOST  ("localhost")
  #define AJP14_DEF_PORT  (8011)
  
  /* -------------------- Impl -------------------- */
  
  
  /*
   * Initialize the worker.
   */
  static int 
  jk2_worker_ajp14_setAttribute(jk_env_t *env, jk_bean_t *mbean, 
                                char *name, void *valueP )
  {
      jk_worker_t *ajp14=(jk_worker_t *)mbean->object;
      char *value=(char *)valueP;
      int    port;
      char * host;
      int err;
      char * secret_key;
      char *channelType;
             
      if( strcmp( name, "secretkey" )==0 ) {
          ajp14->secret = value;
      } else if( strcmp( name, "cachesize" )==0 ) {
          ajp14->cache_sz=atoi( value );
      } else if( strcmp( name, "lb_factor" )==0 ) {
          ajp14->lb_factor=atof( value );
      } else if( strcmp( name, "channel" )==0 ) {
          if( strncmp( value, "channel.", 8 ) != 0 ) {
              char *newValue=(char *)ajp14->pool->calloc( env, ajp14->pool, 
strlen(value) + 10 );
              strcpy( newValue, "channel." );
              strcat( newValue, value );
              env->l->jkLog(env, env->l, JK_LOG_INFO, "ajp14.setProperty() auto 
replace %s %s\n",
                            value, newValue);
              value=newValue;
          }
          ajp14->channel=env->createInstance(env, ajp14->pool, value, NULL );
          if( ajp14->channel == NULL ) {
              env->l->jkLog(env, env->l, JK_LOG_ERROR,
                            "Error creating %s channel\n", channelType );
              return JK_FALSE;
          }
          env->l->jkLog(env, env->l, JK_LOG_INFO, "ajp14.setProperty() channel: %s 
%s\n",
                        value,ajp14->channel->mbean->name);
  
       } else {
          /* It's probably a channel property
           */
          if( ajp14->channel==NULL ) {
              
              env->l->jkLog(env, env->l, JK_LOG_ERROR,
                            "No channel for %s, set channel before other properties 
%s=%s\n",
                            mbean->name, name, value );
              return JK_FALSE;
          }
  
          env->l->jkLog(env, env->l, JK_LOG_INFO, "endpoint.setProperty() channel 
%s=%s\n",
                        name, value);
          ajp14->channel->mbean->setAttribute( env, ajp14->channel->mbean, name, value 
);
      }
  
      env->l->jkLog(env, env->l, JK_LOG_INFO,
                    "ajp14.setProperty() %s %s %s\n", mbean->name, name, value );
      
      return JK_TRUE;
  }
  
  #define JK_AJP13_PING               (unsigned char)8
  
  /* 
   * Build the ping cmd. Tomcat will get control and will be able 
   * to send any command.
   *
   * +-----------------------+
   * | PING CMD (1 byte) |
   * +-----------------------+
   *
   * XXX Add optional Key/Value set .
   *  
   */
  int jk2_serialize_ping(jk_env_t *env, jk_msg_t *msg,
                         jk_endpoint_t *ae)
  {
      int rc;
      
      /* To be on the safe side */
      msg->reset(env, msg);
  
      rc= msg->appendByte( env, msg, JK_AJP13_PING);
      if (rc!=JK_TRUE )
          return JK_FALSE;
  
      return JK_TRUE;
  }
  
  
  /*
   * Close the endpoint (clean buf and close socket)
   */
  static void jk2_close_endpoint(jk_env_t *env, jk_endpoint_t *ae)
  {
      env->l->jkLog(env, env->l, JK_LOG_INFO, "endpoint.close() %s\n",
                    ae->worker->mbean->name);
  
      ae->reuse = JK_FALSE;
      ae->worker->channel->close( env, ae->worker->channel, ae );
      ae->cPool->reset( env, ae->cPool );
      ae->cPool->close( env, ae->cPool );
      ae->pool->reset( env, ae->pool );
      ae->pool->close( env, ae->pool );
  }
  
  /** Connect a channel, implementing the logging protocol if ajp14
   */
  static int jk2_worker_ajp14_connect(jk_env_t *env, jk_endpoint_t *ae) {
      jk_channel_t *channel=ae->worker->channel;
      jk_msg_t *msg;
      
      int err=channel->open( env, channel, ae );
  
      if( err != JK_TRUE ) {
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.connect() failed %s\n", ae->worker->mbean->name );
          return JK_FALSE;
      }
  
      /* Check if we must execute a logon after the physical connect */
      if (ae->worker->secret == NULL)
          return JK_TRUE;
  
      /* Do the logon process */
      env->l->jkLog(env, env->l, JK_LOG_INFO, "ajp14.connect() logging in\n" );
  
      /* use the reply buffer - it's a new channel, it is cetainly not
       in use. The request and post buffers are probably in use if this
      is a reconnect */
      msg=ae->reply;
  
      jk2_serialize_ping( env, msg, ae );
      
      err = ae->worker->channel->send( env, ae->worker->channel, ae, msg );
  
      /* Move to 'slave' mode, listening to messages */
      err=ae->worker->workerEnv->processCallbacks( env, ae->worker->workerEnv,
                                                   ae, NULL);
  
      /* Anything but OK - the login failed
       */
      if( err != JK_TRUE ) {
          jk2_close_endpoint( env, ae );
      }
      return err;
  }
  
  /** There is no point of trying multiple times - each channel may
      have built-in recovery mechanisms
  */
  #define JK_RETRIES 2
  
  /** First message in a stream-based connection. If the first send
      fails, try to reconnect.
  */
  static int JK_METHOD
  jk2_worker_ajp14_sendAndReconnect(jk_env_t *env, jk_worker_t *worker,
                                jk_ws_service_t *s,
                                jk_endpoint_t   *e )
  {
      int attempt;
      int err;
      
      /*
       * Try to send the request on a valid endpoint. If one endpoint
       * fails, close the channel and try again ( maybe tomcat was restarted )
       * 
       * XXX JK_RETRIES could be replaced by the number of workers in
       * a load-balancing configuration 
       */
      for(attempt = 0 ; attempt < JK_RETRIES ;attempt++) {
          jk_channel_t *channel= worker->channel;
  
          /* e->request->dump(env, e->request, "Before sending "); */
          err=e->worker->channel->send( env, e->worker->channel, e,
                                        e->request );
  
        if (err==JK_TRUE ) {
              /* We sent the request, have valid endpoint */
              break;
          }
  
          if( e->recoverable != JK_TRUE ) {
              env->l->jkLog(env, env->l, JK_LOG_ERROR,
                       "ajp14.service() error sending request %s, giving up\n",
                       worker->mbean->name);
              return JK_FALSE;
          }
          
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                  "ajp14.service() error sending, retry on a new endpoint %s\n",
                        e->worker->mbean->name);
  
          channel->close( env, channel, e );
  
          err=jk2_worker_ajp14_connect(env, e); 
  
          if( err != JK_TRUE ) {
              env->l->jkLog(env, env->l, JK_LOG_ERROR,
                       "ajp14.service() failed to reconnect endpoint errno=%d\n",
                       errno);
              return JK_FALSE;
          }
  
          /*
           * After we are connected, each error that we are going to
           * have is probably unrecoverable
           */
          e->recoverable = JK_FALSE;
      }
      return JK_TRUE;
  }
  
  
  static int JK_METHOD
  jk2_worker_ajp14_forwardStream(jk_env_t *env, jk_worker_t *worker,
                                jk_ws_service_t *s,
                                jk_endpoint_t   *e )
  {
      int err;
  
      err=jk2_worker_ajp14_sendAndReconnect( env, worker, s, e );
      if( err!=JK_TRUE )
          return err;
      
      /* We should have a channel now, send the post data */
      s->is_recoverable_error = JK_TRUE;
      e->recoverable = JK_TRUE;
  
      /* Prepare to send some post data ( ajp13 proto ). We do that after the
       request was sent ( we're receiving data from client, can be slow, no
       need to delay - we can do that in paralel. ( not very sure this is
       very usefull, and it brakes the protocol ) ! */
      if (s->is_chunked || s->left_bytes_to_send > 0) {
          /* We never sent any POST data and we check it we have to send at
         * least of block of data (max 8k). These data will be kept in reply
         * for resend if the remote Tomcat is down, a fact we will learn only
         * doing a read (not yet) 
         */
          err=jk2_serialize_postHead( env, e->post, s, e );
  
          if (err != JK_TRUE ) {
              /* the browser stop sending data, no need to recover */
              e->recoverable = JK_FALSE;
              s->is_recoverable_error = JK_FALSE;
              env->l->jkLog(env, env->l, JK_LOG_ERROR,
                            "ajp14.service() Error receiving initial post \n");
              return JK_FALSE;
          }
          err= e->worker->channel->send( env, e->worker->channel, e,
                                         e->post );
      }
  
      err = e->worker->workerEnv->processCallbacks(env, e->worker->workerEnv,
                                                   e, s);
      
      /* if we can't get reply, check if no recover flag was set 
       * if is_recoverable_error is cleared, we have started received 
       * upload data and we must consider that operation is no more recoverable
       */
      if (! e->recoverable) {
          s->is_recoverable_error = JK_FALSE;
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.service() ajpGetReply unrecoverable error %d\n",
                        err);
          return JK_FALSE;
      }
  
      if( err != JK_TRUE ) {
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.service() ajpGetReply recoverable error %d\n",
                        err);
      }
      return err;
  }
  
  static int JK_METHOD
  jk2_worker_ajp14_forwardSingleThread(jk_env_t *env, jk_worker_t *worker,
                                      jk_ws_service_t *s,
                                      jk_endpoint_t   *e )
  {
      int err;
  
      env->l->jkLog(env, env->l, JK_LOG_INFO,
                    "ajp14.forwardST() Before calling native channel\n");
      err=e->worker->channel->send( env, e->worker->channel, e,
                                    e->request );
          env->l->jkLog(env, env->l, JK_LOG_INFO,
                    "ajp14.forwardST() After %d\n",err);
      
      
      return JK_TRUE;
  }
       
  static int JK_METHOD
  jk2_worker_ajp14_service1(jk_env_t *env, jk_worker_t *w,
                          jk_ws_service_t *s,
                          jk_endpoint_t   *e )
  {
      int err;
  
      if( ( e== NULL ) || ( s == NULL ) ) {
        env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.service() NullPointerException\n");
        return JK_FALSE;
      }
  
      e->currentRequest=s;
      
      /* Prepare the messages we'll use.*/ 
      e->request->reset( env, e->request );
      e->reply->reset( env, e->reply );
      e->post->reset( env, e->post );
      
      e->uploadfd        = -1;          /* not yet used, later ;) */
      e->reuse = JK_FALSE;
      s->is_recoverable_error = JK_TRUE;
      /* Up to now, we can recover */
      e->recoverable = JK_TRUE;
  
      s->left_bytes_to_send = s->content_length;
      s->content_read=0;
  
      /* 
       * We get here initial request (in reqmsg)
       */
      err=jk2_serialize_request13(env, e->request, s, e);
      if (err!=JK_TRUE) {
        s->is_recoverable_error = JK_FALSE;                
        env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.service(): error marshaling\n");
        return JK_FALSE;
      }
  
      env->l->jkLog(env, env->l, JK_LOG_INFO,
                    "ajp14.service() %s\n", e->worker->mbean->name);
  
      if( w->channel->beforeRequest != NULL ) {
          w->channel->beforeRequest( env, w->channel, w, e, s );
      }
      
      /* First message for this request */
      if( w->channel->is_stream == JK_TRUE ) {
          err=jk2_worker_ajp14_forwardStream( env, w, s, e );
      } else {
          err=jk2_worker_ajp14_forwardSingleThread( env, w, s, e );
      }
  
      if( w->channel->afterRequest != NULL ) {
          w->channel->afterRequest( env, w->channel, w, e, s );
      }
  
      e->currentRequest=NULL;
      
      return err;
  }
  
  
  
  
  static int JK_METHOD
  jk2_worker_ajp14_done(jk_env_t *env, jk_worker_t *we, jk_endpoint_t *e)
  {
      jk_worker_t *w;
      
      w= e->worker;
  
      if( e->cPool != NULL ) 
          e->cPool->reset(env, e->cPool);
      if (w->endpointCache != NULL ) {
          int err=0;
          err=w->endpointCache->put( env, w->endpointCache, e );
          if( err==JK_TRUE ) {
              env->l->jkLog(env, env->l, JK_LOG_INFO,
                            "ajp14.done() return to pool %s\n",
                            w->mbean->name );
              return JK_TRUE;
          }
      }
  
      /* No reuse or put() failed */
  
      jk2_close_endpoint(env, e);
      env->l->jkLog(env, env->l, JK_LOG_INFO,
                    "ajp14.done() close endpoint %s\n",
                    w->mbean->name );
      
      return JK_TRUE;
  }
  
  static int JK_METHOD
  jk2_worker_ajp14_getEndpoint(jk_env_t *env,
                              jk_worker_t *ajp14,
                              jk_endpoint_t **eP)
  {
      jk_endpoint_t *e = NULL;
      jk_pool_t *endpointPool;
      
      if( ajp14->secret ==NULL ) {
      }
  
      if (ajp14->endpointCache != NULL ) {
  
          e=ajp14->endpointCache->get( env, ajp14->endpointCache );
  
          if (e!=NULL) {
              env->l->jkLog(env, env->l, JK_LOG_INFO,
                            "ajp14.getEndpoint(): Reusing endpoint\n");
              *eP = e;
              return JK_TRUE;
          }
      }
  
      endpointPool = ajp14->pool->create( env, ajp14->pool, HUGE_POOL_SIZE );
      
      e = (jk_endpoint_t *)endpointPool->alloc(env, endpointPool,
                                                sizeof(jk_endpoint_t));
      if (e==NULL) {
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.get_endpoint OutOfMemoryException\n");
          return JK_FALSE;
      }
  
      e->pool = endpointPool;
  
      /* Init message storage areas.
       */
      e->request = jk2_msg_ajp_create( env, e->pool, 0);
      e->reply = jk2_msg_ajp_create( env, e->pool, 0);
      e->post = jk2_msg_ajp_create( env, e->pool, 0);
      
      e->reuse = JK_FALSE;
  
      e->cPool=endpointPool->create(env, endpointPool, HUGE_POOL_SIZE );
  
      e->worker = ajp14;
      e->channelData = NULL;
      
      *eP = e;
      return JK_TRUE;
  }
  
  /*
   * Serve the request, using AJP13/AJP14
   */
  static int JK_METHOD
  jk2_worker_ajp14_service(jk_env_t *env, jk_worker_t *w,
                          jk_ws_service_t *s)
  {
      int err;
      jk_endpoint_t   *e;
  
      /* Get endpoint from the pool */
      jk2_worker_ajp14_getEndpoint( env, w, &e );
  
      err=jk2_worker_ajp14_service1( env, w, s, e );
  
      jk2_worker_ajp14_done( env, w, e);
      return err;
  }
  
  
  static int JK_METHOD
  jk2_worker_ajp14_init(jk_env_t *env, jk_worker_t *ajp14)
  {
      int  rc;
  
      if( ajp14->cache_sz == -1 )
          ajp14->cache_sz=JK_OBJCACHE_DEFAULT_SZ;
  
      if (ajp14->cache_sz > 0) {
          ajp14->endpointCache=jk2_objCache_create( env, ajp14->pool  );
          
          if( ajp14->endpointCache != NULL ) {
              rc=ajp14->endpointCache->init( env, ajp14->endpointCache,
                                             ajp14->cache_sz );
              if( rc!= JK_TRUE ) {
                  ajp14->endpointCache=NULL;
              }
          }
      } else {
          ajp14->endpointCache=NULL;
      }
  
      if( ajp14->channelName == NULL ) {
          /* Use default channel */
          ajp14->channelName="channel.default";
      }
  
      ajp14->channel= env->getByName( env, ajp14->channelName );
      
      if( ajp14->channel == NULL ) {
          /* XXX  Create a default channel using socket/localhost/8009 ! */
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.validate(): no channel found %s\n",
                        ajp14->channelName);
          return JK_FALSE;
      }
      
      ajp14->channel->worker=ajp14;
  
      rc=ajp14->channel->init( env, ajp14->channel );
      if( rc != JK_TRUE ) {
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.init(): channel init failed\n");
          return rc;
      }
  
      return JK_TRUE;
  }
  
  
  static int JK_METHOD
  jk2_worker_ajp14_destroy(jk_env_t *env, jk_worker_t *ajp14)
  {
      int i;
      
      env->l->jkLog(env, env->l, JK_LOG_INFO,
                    "ajp14.destroy()\n");
  
      if( ajp14->endpointCache != NULL ) {
          jk_endpoint_t *e;
  
          while( ajp14->endpointCache->count > 0 ) {
              
              e= ajp14->endpointCache->get( env, ajp14->endpointCache );
              
              if( e==NULL ) {
                  // we finished all endpoints in the cache
                  break;
              }
              
              jk2_close_endpoint(env, e);
          }
          ajp14->endpointCache->destroy( env, ajp14->endpointCache );
  
          env->l->jkLog(env, env->l, JK_LOG_DEBUG,
                        "ajp14.destroy() closed %d cached endpoints\n",
                        i);
      }
  
      ajp14->pool->close( env, ajp14->pool );
  
      return JK_TRUE;
  }
  
  int JK_METHOD jk2_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool,
                                          jk_bean_t *result,
                                          const char *type, const char *name)
  {
      jk_worker_t *w=(jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t));
  
      if (name == NULL || w == NULL) {
          env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.factory() NullPointerException\n");
          return JK_FALSE;
      }
      w->pool = pool;
      w->cache_sz=-1;
      
      w->endpointCache= NULL;
  
      w->channel= NULL;
      w->secret= NULL;
     
      w->init= jk2_worker_ajp14_init;
      w->destroy=jk2_worker_ajp14_destroy;
      w->service = jk2_worker_ajp14_service;
  
      result->setAttribute= jk2_worker_ajp14_setAttribute;
      result->object = w;
      w->mbean=result;
  
      w->workerEnv=env->getByName( env, "workerEnv" );
      w->workerEnv->addWorker( env, w->workerEnv, w );
      
      return JK_TRUE;
  }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to