costin 02/05/09 14:06:48 Modified: jk/native2/common jk_worker_lb.c Log: That's the big one. Please review ! It changes the handling of lb_value to int. I also cleaned up the logic so it's easier ( I hope ) to understand what's happening. "Levels" replace the 'local worker', I thing I got the logic straight for those. I started to add a 'introspection' data, to validate and better report the config. We use one table per level. At the moment the maximum number of workers is hardcoded ( to 255 ), we could make it dynamic but that would make things pretty complex when we add workers dynamically ( it won't work without a CS or atomic operations ) Revision Changes Path 1.12 +195 -181 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c Index: jk_worker_lb.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v retrieving revision 1.11 retrieving revision 1.12 diff -u -r1.11 -r1.12 --- jk_worker_lb.c 9 May 2002 00:01:43 -0000 1.11 +++ jk_worker_lb.c 9 May 2002 21:06:48 -0000 1.12 @@ -77,6 +77,7 @@ /* XXX make it longer - debugging only */ #define WAIT_BEFORE_RECOVER (5) +#define MAX_ATTEMPTS 3 /** Find the best worker. In process, check if timeout expired for workers that failed in the past and give them another chance. @@ -90,115 +91,157 @@ jk_ws_service_t *s, int attempt) { jk_worker_t *rc = NULL; - double lb_min = 0.0; + int lb_min = 0; + int lb_max = 0; int i; + int j; + int level; + int currentLevel=JK_LB_LEVELS - 1; char *session_route; time_t now = 0; session_route = jk2_requtil_getSessionRoute(env, s); if(session_route) { - for(i = 0 ; i < lb->num_of_workers ; i++) { - jk_worker_t *w=lb->lb_workers[i]; - - if(w->route != NULL && - 0 == strcmp(session_route, w->route)) { - if(attempt > 0 && w->in_error_state) { - break; - } else { - return w; - } + for( level=0; level<JK_LB_LEVELS; level++ ) { + for(i = 0 ; i < lb->workerCnt[level]; i++) { + jk_worker_t *w=lb->workerTables[level][i]; + + if(w->route != NULL && + 0 == strcmp(session_route, w->route)) { + if(attempt > 0 && w->in_error_state) { + /* We already tried to revive this worker. */ + break; + } else { + return w; + } + } } } } - /** Get one worker that is ready */ - for(i = 0 ; i < lb->num_of_workers ; i++) { - jk_worker_t *w=lb->lb_workers[i]; - - if(w->in_error_state) { + /** Get one worker that is ready + */ + for( level=0; level<JK_LB_LEVELS; level++ ) { + for(i = 0 ; i < lb->workerCnt[level] ; i++) { + jk_worker_t *w=lb->workerTables[level][i]; + if( w->mbean->disabled ) continue; - - /* Check if it's ready for recovery */ - /* if(!lb->lb_workers[i]->in_recovering) { */ - if( now==0 ) - now = time(NULL); - - if((now - w->error_time) > WAIT_BEFORE_RECOVER) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb.getWorker() timeout expired, reenable again %s\n", - w->mbean->name); - - w->in_recovering = JK_TRUE; - w->in_error_state = JK_FALSE; + if( w->in_error_state ) continue; - /* No need to do that - if it'll be used again, then error time - will be set automatically on error */ - /* w->error_time = now; */ - /* Not sure we need that either */ - /* w->retry_count++; */ - - /* The worker's error state is reset, but that doesn't - mean it'll be used - normal priority selection happens - Don't give bigger priority to recovered workers - */ - /* rc = lb->lb_workers[i]; - break; - */ + if( rc==NULL ) { + rc=w; + currentLevel=level; + lb_min=w->lb_value; + continue; } - } - if( ! lb->lb_workers[i]->in_error_state ) { - if(lb->lb_workers[i]->lb_value == 0 ) { - /* That's the 'default' worker, it'll take all requests. - * All other workers are not used unless this is in error state. - * - * The 'break' will disable checking for recovery on other - * workers - but that doesn't matter as long as the default is alive. - */ - rc=lb->lb_workers[i]; - break; - } - if(lb->lb_workers[i]->lb_value < lb_min || - ( rc==NULL ) ) { - lb_min = lb->lb_workers[i]->lb_value; - rc = lb->lb_workers[i]; + + if( w->lb_value < lb_min ) { + lb_min = w->lb_value; + rc = w; + currentLevel=level; } } + + if( rc!=NULL ) { + /* We found a valid worker on the current level, don't worry about the + higher levels. + */ + break; + } } + + /** Reenable workers in error state if the timeout has passed. + * Don't bother with 'higher' levels, since we'll never try them. + */ + for( level=0; level<=currentLevel; level++ ) { + + for(i = 0 ; i < lb->workerCnt[level] ; i++) { + jk_worker_t *w=lb->workerTables[level][i]; + + if( w->mbean->disabled ) continue; + if(w->in_error_state) { + /* Check if it's ready for recovery */ + if( now==0 ) now = time(NULL); + + if((now - w->error_time) > WAIT_BEFORE_RECOVER) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "lb.getWorker() reenable %s\n", w->mbean->name); + w->in_error_state = JK_FALSE; + + /* Find max lb */ + if( lb_max ==0 ) { + for( j=0; j<lb->workerCnt[level]; j++ ) { + if( lb->workerTables[level][j]->lb_value > lb_max ) { + lb_max=lb->workerTables[level][j]->lb_value; + } + } + } + w->lb_value = lb_max; + } + } + } + } + + /* If no active worker is found, we'll try all workers in error_state, + */ if ( rc==NULL ) { /* no workers found (rc is null), now try as hard as possible to get a worker anyway, pick one with largest error time.. */ - env->l->jkLog(env, env->l, JK_LOG_ERROR, + + env->l->jkLog(env, env->l, JK_LOG_INFO, "lb.getWorker() All workers in error state, use the one with oldest error\n"); - for(i = 0 ; i < lb->num_of_workers ; i++) { - jk_worker_t *w=lb->lb_workers[i]; - - if( w->mbean->disabled == JK_TRUE ) continue; - - if ( rc != NULL ) { + for( level=0; level<JK_LB_LEVELS; level++ ) { + for(i = 0 ; i < lb->workerCnt[level] ; i++) { + jk_worker_t *w=lb->workerTables[level][i]; + + if( w->mbean->disabled == JK_TRUE ) continue; + + if( rc==NULL ) { + rc= w; + currentLevel=level; + continue; + } /* pick the oldest failed worker */ if ( w->error_time < rc->error_time ) { + currentLevel=level; rc = w; } - } else { - rc = w; } } - - if ( rc && rc->in_error_state ) { - rc->in_recovering = JK_TRUE; - rc->in_error_state = JK_FALSE; - } } - if(rc) { + if(rc!=NULL) { + /* It it's the default, it'll remain the default - we don't + increase the factor + */ + rc->in_error_state = JK_FALSE; if( rc->lb_value != 0 ) { - /* It it's the default, it'll remain the default - we don't - increase the factor - */ - rc->lb_value += rc->lb_factor; + int newValue=rc->lb_value + rc->lb_factor; + + if( newValue > 255 ) { + rc->lb_value=rc->lb_factor; + /* Roll over. This has 2 goals: + - avoid the lb factor becoming too big, and give a chance to run to + workers that were in error state ( I think it's cleaner than looking for "max" ) + - the actual lb_value will be 1 byte. Even on the craziest platform, that + will be an atomic write. We do a lot of operations on lb_value in a MT environment, + and the chance of reading something inconsistent is considerable. Since APR + will not support atomic - and adding a CS would cost too much, this is actually + a good solution. + + Note that lb_value is not used for anything critical - just to balance the load, + the worst that may happen is having a worker stay idle for 255 requests. + */ + for(i = 0 ; i < lb->workerCnt[currentLevel] ; i++) { + jk_worker_t *w=lb->workerTables[currentLevel][i]; + w->lb_value=w->lb_factor; + } + } else { + rc->lb_value=newValue; + } } } @@ -210,17 +253,21 @@ jk_worker_t *lb, char *instanceId ) { - int j; + int i; + int level; - for( j=0; j< lb->num_of_workers; j++ ) { - jk_worker_t *w=lb->lb_workers[j]; - if( w->route != NULL && - strcmp( w->route, instanceId ) == 0 ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() Gracefull shutdown %s %s\n", - w->channel->mbean->name, instanceId ); - w->in_error_state= JK_TRUE; - w->mbean->disabled = JK_TRUE; + for( level=0; level<JK_LB_LEVELS; level++ ) { + for(i = 0 ; i < lb->workerCnt[level] ; i++) { + jk_worker_t *w=lb->workerTables[level][i]; + + if( w->route != NULL && + strcmp( w->route, instanceId ) == 0 ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "lb.updateWorkers() Gracefull shutdown %s %s\n", + w->channel->mbean->name, instanceId ); + w->in_error_state= JK_TRUE; + w->mbean->disabled = JK_TRUE; + } } } return JK_OK; @@ -239,6 +286,7 @@ char *tmpBuf; jk_bean_t *chBean; int rc=JK_OK; + int level; jk2_map_default_create(env, &chProp, env->tmpPool); @@ -256,26 +304,29 @@ return JK_ERR; } - for( i=0; i< lb->num_of_workers; i++ ) { - jk_worker_t *w=lb->lb_workers[i]; - if( w->route && - strcmp( w->route, instanceId ) == 0 && - strcmp( w->channel->mbean->name, chName ) == 0 ) { - /* XXX Create a new channel with the update properties, - Then replace it. - - At this moment we just re-enable the worker. - */ - if( w->mbean->disabled || w->in_error_state ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() re-enabling %s %s\n", - w->channel->mbean->name, instanceId ); - w->mbean->disabled=JK_FALSE; - w->in_error_state=JK_FALSE; + for( level=0; level<JK_LB_LEVELS; level++ ) { + for(i = 0 ; i < lb->workerCnt[level] ; i++) { + jk_worker_t *w=lb->workerTables[level][i]; + + if( w->route && + strcmp( w->route, instanceId ) == 0 && + strcmp( w->channel->mbean->name, chName ) == 0 ) { + /* XXX Create a new channel with the update properties, + Then replace it. + + At this moment we just re-enable the worker. + */ + if( w->mbean->disabled || w->in_error_state ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "lb.updateWorkers() re-enabling %s %s\n", + w->channel->mbean->name, instanceId ); + w->mbean->disabled=JK_FALSE; + w->in_error_state=JK_FALSE; + } + + found=JK_TRUE; + break; } - - found=JK_TRUE; - break; } } @@ -434,18 +485,14 @@ int rc; /* Prevent loops */ - if( attempt > lb->num_of_workers + 1 ) { + if( attempt > MAX_ATTEMPTS ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb.service() max attempts exceeded %d\n", attempt); return JK_ERR; } - if( lb->num_of_workers==1 ) { - /* A single worker - no need to search */ - rec=lb->lb_workers[0]; - } else { - rec=jk2_get_most_suitable_worker(env, lb, s, attempt++); - } + rec=jk2_get_most_suitable_worker(env, lb, s, attempt); + attempt++; s->is_recoverable_error = JK_FALSE; @@ -459,21 +506,22 @@ if( lb->mbean->debug > 0 ) env->l->jkLog(env, env->l, JK_LOG_INFO, "lb.service() try %s\n", rec->mbean->name ); + if( rec->route==NULL ) { rec->route=rec->mbean->localName; } + s->jvm_route = rec->route; /* It may be better to do this on the endpoint */ rec->reqCnt++; s->realWorker = rec; + rc = rec->service(env, rec, s); if(rc==JK_OK) { rec->in_error_state = JK_FALSE; - rec->in_recovering = JK_FALSE; - rec->retry_count = 0; rec->error_time = 0; /* the endpoint that succeeded is saved for done() */ return JK_OK; @@ -488,7 +536,6 @@ * Time for fault tolerance (if possible)... */ rec->in_error_state = JK_TRUE; - rec->in_recovering = JK_FALSE; rec->error_time = time(0); rec->errCnt++; @@ -505,7 +552,7 @@ */ if( lb->mbean->debug > 0 ) { env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb_worker.service() try other host\n"); + "lb_worker.service() try other hosts\n"); } } return JK_ERR; @@ -518,65 +565,36 @@ { int currentWorker=0; int i; + int level; int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap); - if( lb->lb_workers_size < num_of_workers ) { - if( lb->lb_workers_size==0 ) { - lb->lb_workers_size=10; - } else { - lb->lb_workers_size = 2 * lb->lb_workers_size; - } - lb->lb_workers = - lb->mbean->pool->alloc(env, lb->mbean->pool, - lb->lb_workers_size * sizeof(jk_worker_t *)); - if(!lb->lb_workers) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb_worker.validate(): OutOfMemoryException\n"); - return JK_ERR; - } - } - for(i = 0 ; i < num_of_workers ; i++) { char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i); jk_worker_t *w= env->getByName( env, name ); + int level=0; + int pos=0; + if( w== NULL ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.init(): no worker found %s\n", name); - num_of_workers--; continue; } + + if( w->mbean->disabled ) continue; - if( w->lb_factor != 0 ) { - w->lb_factor = 1/ w->lb_factor; - lb->lb_workers[currentWorker]=w; - } else { - /* If == 0, then this is the default worker. Switch it with the first - worker to avoid looking too much for it. - */ - jk_worker_t *first=lb->lb_workers[0]; - lb->lb_workers[0]=w; - /* Only do the exchange if the worker is not the first */ - if( currentWorker > 0 ) { - lb->lb_workers[currentWorker]=first; - } - } + level=w->level; + /* It's like disabled */ + if( level >= JK_LB_LEVELS ) continue; + + pos=lb->workerCnt[level]++; + + lb->workerTables[level][pos]=w; - /* - * Allow using lb in fault-tolerant mode. - * Just set lbfactor in worker.properties to 0 to have - * a worker used only when principal is down or session route - * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]> - */ w->lb_value = w->lb_factor; w->in_error_state = JK_FALSE; - w->in_recovering = JK_FALSE; - w->retry_count = 0; - - currentWorker++; } - lb->num_of_workers=num_of_workers; return JK_OK; } @@ -587,12 +605,15 @@ lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, ""); env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb_worker.setAttribute(): Adding %s %s\n", lb->mbean->name, name); + "lb_worker.setAttribute(): Adding to %s: %s\n", lb->mbean->localName, name); } -static int JK_METHOD jk2_lb_setProperty(jk_env_t *env, jk_bean_t *mbean, - char *name, void *valueP) +static char *jk2_worker_lb_multiValueInfo[]={"worker", NULL }; +static char *jk2_worker_lb_setAttributeInfo[]={"debug", NULL }; + +static int JK_METHOD jk2_lb_setAttribute(jk_env_t *env, jk_bean_t *mbean, + char *name, void *valueP) { jk_worker_t *lb=mbean->object; char *value=valueP; @@ -602,20 +623,7 @@ unsigned i = 0; char *tmp; - if( strcmp( name, "balanced_workers") == 0 ) { - worker_names=jk2_config_split( env, lb->mbean->pool, - value, NULL, &num_of_workers ); - if( worker_names==NULL || num_of_workers==0 ) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb_worker.validate(): no defined workers\n"); - return JK_ERR; - } - for(i = 0 ; i < num_of_workers ; i++) { - jk2_lb_addWorker( env, lb, worker_names[i]); - } - jk2_lb_refresh( env, lb ); - return JK_OK; - } else if( strcmp( name, "worker") == 0 ) { + if( strcmp( name, "worker") == 0 ) { jk2_lb_addWorker( env, lb, value); jk2_lb_refresh( env, lb ); return JK_OK; @@ -630,6 +638,7 @@ char **worker_names; int i = 0; char *tmp; + int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap); err=jk2_lb_refresh(env, lb ); if( err != JK_OK ) @@ -641,7 +650,7 @@ env->l->jkLog(env, env->l, JK_LOG_INFO, "lb.init() %s %d workers\n", - lb->mbean->name, lb->num_of_workers ); + lb->mbean->name, num_of_workers ); return JK_OK; } @@ -660,6 +669,7 @@ jk_bean_t *result, char *type, char *name) { jk_worker_t *w; + int i; if(NULL == name ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, @@ -675,16 +685,20 @@ return JK_ERR; } - w->lb_workers = NULL; - w->num_of_workers = 0; w->worker_private = NULL; w->init = jk2_lb_init; w->destroy = jk2_lb_destroy; w->service = jk2_lb_service; - + + for( i=0; i<JK_LB_LEVELS; i++ ) { + w->workerCnt[i]=0; + } + jk2_map_default_create(env,&w->lbWorkerMap, pool); - result->setAttribute=jk2_lb_setProperty; + result->setAttribute=jk2_lb_setAttribute; + result->multiValueInfo=jk2_worker_lb_multiValueInfo; + result->setAttributeInfo=jk2_worker_lb_setAttributeInfo; result->object=w; w->mbean=result;
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>