costin      02/05/09 14:06:48

  Modified:    jk/native2/common jk_worker_lb.c
  Log:
  That's the big one.
  
  Please review !
  
  It changes the handling of lb_value to int. I also cleaned up the logic so
  it's easier ( I hope ) to understand what's happening. "Levels" replace
  the 'local worker', I thing I got the logic straight for those.
  
  I started to add a 'introspection' data, to validate and better report
  the config.
  
  We use one table per level. At the moment the maximum number of workers
  is hardcoded ( to 255 ), we could make it dynamic but that would make things
  pretty complex when we add workers dynamically ( it won't work without
  a CS or atomic operations )
  
  Revision  Changes    Path
  1.12      +195 -181  jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c
  
  Index: jk_worker_lb.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- jk_worker_lb.c    9 May 2002 00:01:43 -0000       1.11
  +++ jk_worker_lb.c    9 May 2002 21:06:48 -0000       1.12
  @@ -77,6 +77,7 @@
   /* XXX make it longer - debugging only */
   #define WAIT_BEFORE_RECOVER (5) 
   
  +#define MAX_ATTEMPTS 3
   
   /** Find the best worker. In process, check if timeout expired
       for workers that failed in the past and give them another chance.
  @@ -90,115 +91,157 @@
                                                    jk_ws_service_t *s, int attempt)
   {
       jk_worker_t *rc = NULL;
  -    double lb_min = 0.0;    
  +    int lb_min = 0;    
  +    int lb_max = 0;    
       int i;
  +    int j;
  +    int level;
  +    int currentLevel=JK_LB_LEVELS - 1;
       char *session_route;
       time_t now = 0;
   
       session_route = jk2_requtil_getSessionRoute(env, s);
          
       if(session_route) {
  -        for(i = 0 ; i < lb->num_of_workers ; i++) {
  -            jk_worker_t *w=lb->lb_workers[i];
  -            
  -            if(w->route != NULL &&
  -               0 == strcmp(session_route, w->route)) {
  -                if(attempt > 0 && w->in_error_state) {
  -                   break;
  -                } else {
  -                    return w;
  -                 }
  +        for( level=0; level<JK_LB_LEVELS; level++ ) {
  +            for(i = 0 ; i < lb->workerCnt[level]; i++) {
  +                jk_worker_t *w=lb->workerTables[level][i];
  +                
  +                if(w->route != NULL &&
  +                   0 == strcmp(session_route, w->route)) {
  +                    if(attempt > 0 && w->in_error_state) {
  +                        /* We already tried to revive this worker. */
  +                        break;
  +                    } else {
  +                        return w;
  +                    }
  +                }
               }
           }
       }
   
  -    /** Get one worker that is ready */
  -    for(i = 0 ; i < lb->num_of_workers ; i++) {
  -        jk_worker_t *w=lb->lb_workers[i];
  -        
  -        if(w->in_error_state) {
  +    /** Get one worker that is ready
  +     */
  +    for( level=0; level<JK_LB_LEVELS; level++ ) {
  +        for(i = 0 ; i < lb->workerCnt[level] ; i++) {
  +            jk_worker_t *w=lb->workerTables[level][i];
  +
               if( w->mbean->disabled ) continue;
  -            
  -            /* Check if it's ready for recovery */
  -            /* if(!lb->lb_workers[i]->in_recovering) { */
  -            if( now==0 )
  -                now = time(NULL);
  -                
  -            if((now - w->error_time) > WAIT_BEFORE_RECOVER) {
  -                env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                              "lb.getWorker() timeout expired, reenable again %s\n",
  -                              w->mbean->name);
  -                
  -                w->in_recovering  = JK_TRUE;
  -                w->in_error_state = JK_FALSE;
  +            if( w->in_error_state ) continue;
   
  -                /* No need to do that - if it'll be used again, then error time
  -                   will be set automatically on error */
  -                /*  w->error_time     = now;   */
  -                /* Not sure we need that either */
  -                /*  w->retry_count++; */
  -
  -                /* The worker's error state is reset, but that doesn't
  -                   mean it'll be used - normal priority selection happens
  -                   Don't give bigger priority to recovered workers
  -                */
  -                /* rc = lb->lb_workers[i]; 
  -                   break;
  -                */
  +            if( rc==NULL ) {
  +                rc=w;
  +                currentLevel=level;
  +                lb_min=w->lb_value;
  +                continue;
               }
  -        }
  -        if( ! lb->lb_workers[i]->in_error_state ) {
  -            if(lb->lb_workers[i]->lb_value == 0 ) {
  -                /* That's the 'default' worker, it'll take all requests.
  -                 * All other workers are not used unless this is in error state.
  -                 *
  -                 * The 'break' will disable checking for recovery on other
  -                 * workers - but that doesn't matter as long as the default is 
alive.
  -                 */
  -                rc=lb->lb_workers[i];
  -                break;
  -            }
  -            if(lb->lb_workers[i]->lb_value < lb_min ||
  -               ( rc==NULL ) ) {
  -                lb_min = lb->lb_workers[i]->lb_value;
  -                rc = lb->lb_workers[i];
  +            
  +            if( w->lb_value < lb_min ) {
  +                lb_min = w->lb_value;
  +                rc = w;
  +                currentLevel=level;
               }
           }
  +
  +        if( rc!=NULL ) {
  +            /* We found a valid worker on the current level, don't worry about the
  +               higher levels.
  +            */
  +            break;
  +        }
       }
  +
  +    /** Reenable workers in error state if the timeout has passed.
  +     *  Don't bother with 'higher' levels, since we'll never try them.
  +     */
  +    for( level=0; level<=currentLevel; level++ ) {
  +
  +        for(i = 0 ; i < lb->workerCnt[level] ; i++) {
  +            jk_worker_t *w=lb->workerTables[level][i];
  +
  +            if( w->mbean->disabled ) continue;
           
  +            if(w->in_error_state) {
  +                /* Check if it's ready for recovery */
  +                if( now==0 ) now = time(NULL);
  +                
  +                if((now - w->error_time) > WAIT_BEFORE_RECOVER) {
  +                    env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +                                  "lb.getWorker() reenable %s\n", w->mbean->name);
  +                    w->in_error_state = JK_FALSE;
  +
  +                    /* Find max lb */
  +                    if( lb_max ==0 ) {
  +                        for( j=0; j<lb->workerCnt[level]; j++ ) {
  +                            if( lb->workerTables[level][j]->lb_value > lb_max ) {
  +                                lb_max=lb->workerTables[level][j]->lb_value;
  +                            }
  +                        }
  +                    }
  +                    w->lb_value = lb_max;
  +                }
  +            }
  +        }
  +    }
  +
  +    /* If no active worker is found, we'll try all workers in error_state,
  +    */
       if ( rc==NULL ) {
           /* no workers found (rc is null), now try as hard as possible to get a
              worker anyway, pick one with largest error time.. */
  -        env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +        
  +        env->l->jkLog(env, env->l, JK_LOG_INFO,
                         "lb.getWorker() All workers in error state, use the one with 
oldest error\n");
           
  -        for(i = 0 ; i < lb->num_of_workers ; i++) {
  -            jk_worker_t *w=lb->lb_workers[i];
  -            
  -            if( w->mbean->disabled == JK_TRUE ) continue;
  -            
  -            if ( rc != NULL ) {
  +        for( level=0; level<JK_LB_LEVELS; level++ ) {
  +            for(i = 0 ; i < lb->workerCnt[level] ; i++) {
  +                jk_worker_t *w=lb->workerTables[level][i];
  +
  +                if( w->mbean->disabled == JK_TRUE ) continue;
  +
  +                if( rc==NULL ) {
  +                    rc= w;
  +                    currentLevel=level;
  +                    continue;
  +                }
                   /* pick the oldest failed worker */
                   if ( w->error_time < rc->error_time ) {
  +                    currentLevel=level;
                       rc = w;
                   }
  -            } else {
  -                rc = w;
               }
           }
  -    
  -        if ( rc  && rc->in_error_state ) {
  -            rc->in_recovering  = JK_TRUE;
  -            rc->in_error_state  = JK_FALSE;
  -        }
       }
       
  -    if(rc) {
  +    if(rc!=NULL) {
  +        /* It it's the default, it'll remain the default - we don't
  +           increase the factor
  +        */
  +        rc->in_error_state  = JK_FALSE;
           if( rc->lb_value != 0 ) {
  -            /* It it's the default, it'll remain the default - we don't
  -               increase the factor
  -            */
  -            rc->lb_value += rc->lb_factor;
  +            int newValue=rc->lb_value + rc->lb_factor;
  +            
  +            if( newValue > 255 ) {
  +                rc->lb_value=rc->lb_factor;
  +                /* Roll over. This has 2 goals:
  +                   - avoid the lb factor becoming too big, and give a chance to run 
to
  +                   workers that were in error state ( I think it's cleaner than 
looking for "max" )
  +                   - the actual lb_value will be 1 byte. Even on the craziest 
platform, that
  +                   will be an atomic write. We do a lot of operations on lb_value 
in a MT environment,
  +                   and the chance of reading something inconsistent is 
considerable. Since APR
  +                   will not support atomic - and adding a CS would cost too much, 
this is actually
  +                   a good solution.
  +
  +                   Note that lb_value is not used for anything critical - just to 
balance the load,
  +                   the worst that may happen is having a worker stay idle for 255 
requests.
  +                */
  +                for(i = 0 ; i < lb->workerCnt[currentLevel] ; i++) {
  +                    jk_worker_t *w=lb->workerTables[currentLevel][i];
  +                    w->lb_value=w->lb_factor;
  +                }
  +            } else {
  +                rc->lb_value=newValue;
  +            }
           }
       }
   
  @@ -210,17 +253,21 @@
                                             jk_worker_t *lb,
                                             char *instanceId )
   {
  -    int j;
  +    int i;
  +    int level;
       
  -    for( j=0; j< lb->num_of_workers; j++ ) {
  -        jk_worker_t *w=lb->lb_workers[j];
  -        if( w->route != NULL &&
  -            strcmp( w->route, instanceId ) == 0 ) {
  -            env->l->jkLog(env, env->l, JK_LOG_INFO,
  -                          "lb.updateWorkers() Gracefull shutdown %s %s\n",
  -                          w->channel->mbean->name, instanceId );
  -            w->in_error_state= JK_TRUE;
  -            w->mbean->disabled = JK_TRUE;
  +    for( level=0; level<JK_LB_LEVELS; level++ ) {
  +        for(i = 0 ; i < lb->workerCnt[level] ; i++) {
  +            jk_worker_t *w=lb->workerTables[level][i];
  +
  +            if( w->route != NULL &&
  +                strcmp( w->route, instanceId ) == 0 ) {
  +                env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                              "lb.updateWorkers() Gracefull shutdown %s %s\n",
  +                              w->channel->mbean->name, instanceId );
  +                w->in_error_state= JK_TRUE;
  +                w->mbean->disabled = JK_TRUE;
  +            }
           }
       }
       return JK_OK;
  @@ -239,6 +286,7 @@
       char *tmpBuf;
       jk_bean_t *chBean;
       int rc=JK_OK;
  +    int level;
   
       jk2_map_default_create(env, &chProp, env->tmpPool);
   
  @@ -256,26 +304,29 @@
           return JK_ERR;
       }
   
  -    for( i=0; i< lb->num_of_workers; i++ ) {
  -        jk_worker_t *w=lb->lb_workers[i];
  -        if( w->route &&
  -            strcmp( w->route, instanceId ) == 0 &&
  -            strcmp( w->channel->mbean->name, chName ) == 0 ) {
  -            /* XXX Create a new channel with the update properties,
  -               Then replace it.
  -
  -               At this moment we just re-enable the worker.
  -            */
  -            if( w->mbean->disabled || w->in_error_state ) {
  -                env->l->jkLog(env, env->l, JK_LOG_INFO,
  -                              "lb.updateWorkers() re-enabling %s %s\n",
  -                              w->channel->mbean->name, instanceId );
  -                w->mbean->disabled=JK_FALSE;
  -                w->in_error_state=JK_FALSE;
  +    for( level=0; level<JK_LB_LEVELS; level++ ) {
  +        for(i = 0 ; i < lb->workerCnt[level] ; i++) {
  +            jk_worker_t *w=lb->workerTables[level][i];
  +
  +            if( w->route &&
  +                strcmp( w->route, instanceId ) == 0 &&
  +                strcmp( w->channel->mbean->name, chName ) == 0 ) {
  +                /* XXX Create a new channel with the update properties,
  +                   Then replace it.
  +                   
  +                   At this moment we just re-enable the worker.
  +                */
  +                if( w->mbean->disabled || w->in_error_state ) {
  +                    env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                                  "lb.updateWorkers() re-enabling %s %s\n",
  +                                  w->channel->mbean->name, instanceId );
  +                    w->mbean->disabled=JK_FALSE;
  +                    w->in_error_state=JK_FALSE;
  +                }
  +                
  +                found=JK_TRUE;
  +                break;
               }
  -            
  -            found=JK_TRUE;
  -            break;
           }
       }
   
  @@ -434,18 +485,14 @@
           int rc;
   
           /* Prevent loops */
  -        if( attempt > lb->num_of_workers + 1 ) {
  +        if( attempt > MAX_ATTEMPTS ) {
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                             "lb.service() max attempts exceeded %d\n", attempt);
               return JK_ERR;
           }
           
  -        if( lb->num_of_workers==1 ) {
  -            /* A single worker - no need to search */
  -            rec=lb->lb_workers[0];
  -        } else {
  -            rec=jk2_get_most_suitable_worker(env, lb, s, attempt++);
  -        }
  +        rec=jk2_get_most_suitable_worker(env, lb, s, attempt);
  +        attempt++;
           
           s->is_recoverable_error = JK_FALSE;
   
  @@ -459,21 +506,22 @@
           if( lb->mbean->debug > 0 ) 
               env->l->jkLog(env, env->l, JK_LOG_INFO,
                             "lb.service() try %s\n", rec->mbean->name );
  +        
           if( rec->route==NULL ) {
               rec->route=rec->mbean->localName;
           }
  +        
           s->jvm_route = rec->route;
   
           /* It may be better to do this on the endpoint */
           rec->reqCnt++;
   
           s->realWorker = rec;
  +
           rc = rec->service(env, rec, s);
   
           if(rc==JK_OK) {                        
               rec->in_error_state = JK_FALSE;
  -            rec->in_recovering  = JK_FALSE;
  -            rec->retry_count    = 0;
               rec->error_time     = 0;
               /* the endpoint that succeeded is saved for done() */
               return JK_OK;
  @@ -488,7 +536,6 @@
            * Time for fault tolerance (if possible)...
            */
           rec->in_error_state = JK_TRUE;
  -        rec->in_recovering  = JK_FALSE;
           rec->error_time     = time(0);
           rec->errCnt++;
           
  @@ -505,7 +552,7 @@
            */
           if( lb->mbean->debug > 0 ) {
               env->l->jkLog(env, env->l, JK_LOG_INFO, 
  -                          "lb_worker.service() try other host\n");
  +                          "lb_worker.service() try other hosts\n");
           }
       }
       return JK_ERR;
  @@ -518,65 +565,36 @@
   {
       int currentWorker=0;
       int i;
  +    int level;
       int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
   
  -    if( lb->lb_workers_size < num_of_workers ) {
  -        if( lb->lb_workers_size==0 ) {
  -            lb->lb_workers_size=10;
  -        } else {
  -            lb->lb_workers_size = 2 * lb->lb_workers_size;
  -        }
  -        lb->lb_workers =
  -            lb->mbean->pool->alloc(env, lb->mbean->pool, 
  -                                   lb->lb_workers_size * sizeof(jk_worker_t *));
  -        if(!lb->lb_workers) {
  -            env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                          "lb_worker.validate(): OutOfMemoryException\n");
  -            return JK_ERR;
  -        }
  -    }    
  -
       for(i = 0 ; i < num_of_workers ; i++) {
           char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i);
           jk_worker_t *w= env->getByName( env, name );
  +        int level=0;
  +        int pos=0;
  +
           if( w== NULL ) {
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                             "lb_worker.init(): no worker found %s\n", name);
  -            num_of_workers--;
               continue;
           }
  +
  +        if( w->mbean->disabled ) continue;
           
  -        if( w->lb_factor != 0 ) {
  -            w->lb_factor = 1/ w->lb_factor;
  -            lb->lb_workers[currentWorker]=w;
  -        } else {
  -            /* If == 0, then this is the default worker. Switch it with the first
  -               worker to avoid looking too much for it.
  -             */
  -            jk_worker_t *first=lb->lb_workers[0];
  -            lb->lb_workers[0]=w;
  -            /* Only do the exchange if the worker is not the first */
  -            if( currentWorker > 0 ) {
  -                lb->lb_workers[currentWorker]=first;
  -            }
  -        }
  +        level=w->level;
           
  +        /* It's like disabled */
  +        if( level >= JK_LB_LEVELS ) continue;
  +
  +        pos=lb->workerCnt[level]++;
  +        
  +        lb->workerTables[level][pos]=w;
   
  -        /* 
  -         * Allow using lb in fault-tolerant mode.
  -         * Just set lbfactor in worker.properties to 0 to have 
  -         * a worker used only when principal is down or session route
  -         * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]>
  -         */
           w->lb_value = w->lb_factor;
           w->in_error_state = JK_FALSE;
  -        w->in_recovering  = JK_FALSE;
  -        w->retry_count  = 0;
  -
  -        currentWorker++;
       }
       
  -    lb->num_of_workers=num_of_workers;
       return JK_OK;
   }
   
  @@ -587,12 +605,15 @@
       lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, "");
       
       env->l->jkLog(env, env->l, JK_LOG_INFO,
  -                  "lb_worker.setAttribute(): Adding %s %s\n", lb->mbean->name, 
name);
  +                  "lb_worker.setAttribute(): Adding to %s: %s\n", 
lb->mbean->localName, name);
   
   }
   
  -static int JK_METHOD jk2_lb_setProperty(jk_env_t *env, jk_bean_t *mbean, 
  -                                        char *name, void *valueP)
  +static char *jk2_worker_lb_multiValueInfo[]={"worker", NULL };
  +static char *jk2_worker_lb_setAttributeInfo[]={"debug", NULL };
  +
  +static int JK_METHOD jk2_lb_setAttribute(jk_env_t *env, jk_bean_t *mbean, 
  +                                         char *name, void *valueP)
   {
       jk_worker_t *lb=mbean->object;
       char *value=valueP;
  @@ -602,20 +623,7 @@
       unsigned i = 0;
       char *tmp;
       
  -    if( strcmp( name, "balanced_workers") == 0 ) {
  -        worker_names=jk2_config_split( env,  lb->mbean->pool,
  -                                       value, NULL, &num_of_workers );
  -        if( worker_names==NULL || num_of_workers==0 ) {
  -            env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                          "lb_worker.validate(): no defined workers\n");
  -            return JK_ERR;
  -        }
  -        for(i = 0 ; i < num_of_workers ; i++) {
  -            jk2_lb_addWorker( env, lb, worker_names[i]);
  -        }
  -        jk2_lb_refresh( env, lb );
  -        return JK_OK;
  -    } else if( strcmp( name, "worker") == 0 ) {
  +    if( strcmp( name, "worker") == 0 ) {
           jk2_lb_addWorker( env, lb, value);
           jk2_lb_refresh( env, lb );
           return JK_OK;
  @@ -630,6 +638,7 @@
       char **worker_names;
       int i = 0;
       char *tmp;
  +    int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
   
       err=jk2_lb_refresh(env, lb );
       if( err != JK_OK )
  @@ -641,7 +650,7 @@
   
       env->l->jkLog(env, env->l, JK_LOG_INFO,
                     "lb.init() %s %d workers\n",
  -                  lb->mbean->name, lb->num_of_workers );
  +                  lb->mbean->name, num_of_workers );
       
       return JK_OK;
   }
  @@ -660,6 +669,7 @@
                                       jk_bean_t *result, char *type, char *name)
   {
       jk_worker_t *w;
  +    int i;
       
       if(NULL == name ) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
  @@ -675,16 +685,20 @@
           return JK_ERR;
       }
   
  -    w->lb_workers = NULL;
  -    w->num_of_workers = 0;
       w->worker_private = NULL;
       w->init           = jk2_lb_init;
       w->destroy        = jk2_lb_destroy;
       w->service        = jk2_lb_service;
  -   
  +    
  +    for( i=0; i<JK_LB_LEVELS; i++ ) {
  +        w->workerCnt[i]=0;
  +    }
  +    
       jk2_map_default_create(env,&w->lbWorkerMap, pool);
   
  -    result->setAttribute=jk2_lb_setProperty;
  +    result->setAttribute=jk2_lb_setAttribute;
  +    result->multiValueInfo=jk2_worker_lb_multiValueInfo;
  +    result->setAttributeInfo=jk2_worker_lb_setAttributeInfo;
       result->object=w;
       w->mbean=result;
   
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to