costin 02/01/25 23:03:42 Modified: jk/native2/common jk_ajp14_worker.c Log: That's one of the biggest changes. First, the endpoint management is now more explicit and direct - since this worker is sending the request, it needs endpoint and it recycles it. Second, we treat jni as a particular case - with a different way to handle it. What's nice is that now almost all of the code is shared and common - we do get reused endpoints for jni and all the good stuff from ajp ( including efficient c2b and less GC ). Revision Changes Path 1.14 +173 -125 jakarta-tomcat-connectors/jk/native2/common/jk_ajp14_worker.c Index: jk_ajp14_worker.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_ajp14_worker.c,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- jk_ajp14_worker.c 12 Jan 2002 05:05:12 -0000 1.13 +++ jk_ajp14_worker.c 26 Jan 2002 07:03:42 -0000 1.14 @@ -72,35 +72,7 @@ #include "jk_service.h" #include "jk_env.h" #include "jk_objCache.h" -#include "jk_ajp14.h" - -int JK_METHOD jk_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool, void **result, - const char *type, const char *name); - -static int JK_METHOD -jk_worker_ajp14_service(jk_env_t *env, jk_endpoint_t *e, - jk_ws_service_t *s, - int *is_recoverable_error); - -static int JK_METHOD -jk_worker_ajp14_validate(jk_env_t *env, jk_worker_t *_this, - jk_map_t *props, - jk_workerEnv_t *we ); - -static int JK_METHOD -jk_worker_ajp14_done(jk_env_t *env, jk_endpoint_t *e); - -static int JK_METHOD -jk_worker_ajp14_getEndpoint(jk_env_t *env, jk_worker_t *_this, - jk_endpoint_t **e); - -static int JK_METHOD -jk_worker_ajp14_init(jk_env_t *env, jk_worker_t *_this, - jk_map_t *props, - jk_workerEnv_t *we); - -static int JK_METHOD -jk_worker_ajp14_destroy(jk_env_t *env, jk_worker_t *_this); +#include "jk_registry.h" #define AJP_DEF_RETRY_ATTEMPTS (2) @@ -114,37 +86,6 @@ /* -------------------- Impl -------------------- */ -int JK_METHOD jk_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool, - void **result, - const char *type, const char *name) -{ - jk_worker_t *w=(jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t)); - - if (name == NULL || w == NULL) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "ajp14.factory() NullPointerException\n"); - return JK_FALSE; - } - w->pool = pool; - w->name = NULL; - - w->proto= AJP14_PROTO; - - w->endpointCache= NULL; - w->connect_retry_attempts= AJP_DEF_RETRY_ATTEMPTS; - - w->channel= NULL; - w->secret= NULL; - - w->validate= jk_worker_ajp14_validate; - w->init= jk_worker_ajp14_init; - w->get_endpoint= jk_worker_ajp14_getEndpoint; - w->destroy=jk_worker_ajp14_destroy; - - *result = w; - - return JK_TRUE; -} /* * Initialize the worker. @@ -199,13 +140,11 @@ } } - _this->channel->setProperty( env, _this->channel, "defaultPort", "8007" ); - err=_this->channel->init( env, _this->channel, props, p->name, _this); if( err != JK_TRUE ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, - "ajp14.validate(): resolve failed\n"); + "ajp14.validate(): channel init failed\n"); return err; } @@ -260,7 +199,7 @@ jk_serialize_ping( env, msg, ae ); - err = msg->send( env, msg, ae ); + err = ae->worker->channel->send( env, ae->worker->channel, ae, msg ); /* Move to 'slave' mode, listening to messages */ err=ae->worker->workerEnv->processCallbacks( env, ae->worker->workerEnv, @@ -274,56 +213,17 @@ return err; } - -/* - * Serve the request, using AJP13/AJP14 - */ -static int JK_METHOD -jk_worker_ajp14_service(jk_env_t *env, jk_endpoint_t *e, - jk_ws_service_t *s, - int *is_recoverable_error) +/** First message in a stream-based connection. If the first send + fails, try to reconnect. +*/ +static int JK_METHOD +jk_worker_ajp14_sendAndReconnect(jk_env_t *env, jk_worker_t *worker, + jk_ws_service_t *s, + jk_endpoint_t *e ) { - int err; int attempt; - int hasPost=JK_FALSE; - - if( ( e== NULL ) - || ( s == NULL ) - || ! is_recoverable_error ) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "ajp14.service() NullPointerException\n"); - return JK_FALSE; - } - - /* Prepare the messages we'll use.*/ - e->request->reset( env, e->request ); - e->reply->reset( env, e->reply ); - e->post->reset( env, e->post ); + int err; - e->recoverable = JK_TRUE; - e->uploadfd = -1; /* not yet used, later ;) */ - e->reuse = JK_FALSE; - *is_recoverable_error = JK_TRUE; - /* Up to now, we can recover */ - e->recoverable = JK_TRUE; - - s->left_bytes_to_send = s->content_length; - s->content_read=0; - - /* - * We get here initial request (in reqmsg) - */ - err=jk_serialize_request13(env, e->request, s); - if (err!=JK_TRUE) { - *is_recoverable_error = JK_FALSE; - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "ajp14.service(): error marshaling\n"); - return JK_FALSE; - } - - env->l->jkLog(env, env->l, JK_LOG_INFO, - "ajp14.service() %s\n", e->worker->name); - /* * Try to send the request on a valid endpoint. If one endpoint * fails, close the channel and try again ( maybe tomcat was restarted ) @@ -331,11 +231,12 @@ * XXX JK_RETRIES could be replaced by the number of workers in * a load-balancing configuration */ - for(attempt = 0 ; attempt < e->worker->connect_retry_attempts ;attempt++) { - jk_channel_t *channel=e->worker->channel; + for(attempt = 0 ; attempt < worker->connect_retry_attempts ;attempt++) { + jk_channel_t *channel= worker->channel; /* e->request->dump(env, e->request, "Before sending "); */ - err=e->request->send( env, e->request, e); + err=e->worker->channel->send( env, e->worker->channel, e, + e->request ); if (err==JK_TRUE ) { /* We sent the request, have valid endpoint */ @@ -345,12 +246,12 @@ if( e->recoverable != JK_TRUE ) { env->l->jkLog(env, env->l, JK_LOG_ERROR, "ajp14.service() error sending request %s, giving up\n", - e->worker->name); + worker->name); return JK_FALSE; } env->l->jkLog(env, env->l, JK_LOG_ERROR, - "ajp14.service() error sending, retry on a new endpoint %s\n", + "ajp14.service() error sending, retry on a new endpoint %s\n", e->worker->name); channel->close( env, channel, e ); @@ -370,9 +271,23 @@ */ e->recoverable = JK_FALSE; } + return JK_TRUE; +} + + +static int JK_METHOD +jk_worker_ajp14_forwardStream(jk_env_t *env, jk_worker_t *worker, + jk_ws_service_t *s, + jk_endpoint_t *e ) +{ + int err; + + err=jk_worker_ajp14_sendAndReconnect( env, worker, s, e ); + if( err!=JK_TRUE ) + return err; /* We should have a channel now, send the post data */ - *is_recoverable_error = JK_TRUE; + s->is_recoverable_error = JK_TRUE; e->recoverable = JK_TRUE; /* Prepare to send some post data ( ajp13 proto ). We do that after the @@ -390,12 +305,13 @@ if (err != JK_TRUE ) { /* the browser stop sending data, no need to recover */ e->recoverable = JK_FALSE; + s->is_recoverable_error = JK_FALSE; env->l->jkLog(env, env->l, JK_LOG_ERROR, "ajp14.service() Error receiving initial post \n"); return JK_FALSE; } - - err= e->post->send( env, e->post, e ); + err= e->worker->channel->send( env, e->worker->channel, e, + e->post ); } err = e->worker->workerEnv->processCallbacks(env, e->worker->workerEnv, @@ -406,7 +322,7 @@ * upload data and we must consider that operation is no more recoverable */ if (! e->recoverable) { - *is_recoverable_error = JK_FALSE; + s->is_recoverable_error = JK_FALSE; env->l->jkLog(env, env->l, JK_LOG_ERROR, "ajp14.service() ajpGetReply unrecoverable error %d\n", err); @@ -418,13 +334,95 @@ "ajp14.service() ajpGetReply recoverable error %d\n", err); } + return err; +} + +static int JK_METHOD +jk_worker_ajp14_forwardSingleThread(jk_env_t *env, jk_worker_t *worker, + jk_ws_service_t *s, + jk_endpoint_t *e ) +{ + int err; + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "ajp14.forwardST() Before calling native channel\n"); + err=e->worker->channel->send( env, e->worker->channel, e, + e->request ); + env->l->jkLog(env, env->l, JK_LOG_INFO, + "ajp14.forwardST() After %d\n",err); + + + return JK_TRUE; +} + +static int JK_METHOD +jk_worker_ajp14_service1(jk_env_t *env, jk_worker_t *w, + jk_ws_service_t *s, + jk_endpoint_t *e ) +{ + int err; + + if( ( e== NULL ) || ( s == NULL ) ) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "ajp14.service() NullPointerException\n"); + return JK_FALSE; + } + + e->currentRequest=s; + + /* Prepare the messages we'll use.*/ + e->request->reset( env, e->request ); + e->reply->reset( env, e->reply ); + e->post->reset( env, e->post ); + + e->uploadfd = -1; /* not yet used, later ;) */ + e->reuse = JK_FALSE; + s->is_recoverable_error = JK_TRUE; + /* Up to now, we can recover */ + e->recoverable = JK_TRUE; + + s->left_bytes_to_send = s->content_length; + s->content_read=0; + + /* + * We get here initial request (in reqmsg) + */ + err=jk_serialize_request13(env, e->request, s); + if (err!=JK_TRUE) { + s->is_recoverable_error = JK_FALSE; + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "ajp14.service(): error marshaling\n"); + return JK_FALSE; + } + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "ajp14.service() %s\n", e->worker->name); + + if( w->channel->beforeRequest != NULL ) { + w->channel->beforeRequest( env, w->channel, w, e, s ); + } + + /* First message for this request */ + if( w->channel->is_stream == JK_TRUE ) { + err=jk_worker_ajp14_forwardStream( env, w, s, e ); + } else { + err=jk_worker_ajp14_forwardSingleThread( env, w, s, e ); + } + + if( w->channel->afterRequest != NULL ) { + w->channel->afterRequest( env, w->channel, w, e, s ); + } + + e->currentRequest=NULL; return err; } + + static int JK_METHOD -jk_worker_ajp14_done(jk_env_t *env, jk_endpoint_t *e) +jk_worker_ajp14_done(jk_env_t *env, jk_worker_t *we, jk_endpoint_t *e) { jk_worker_t *w; @@ -449,7 +447,7 @@ env->l->jkLog(env, env->l, JK_LOG_INFO, "ajp14.done() close endpoint %s\n", w->name ); - + return JK_TRUE; } @@ -501,13 +499,31 @@ e->worker = _this; e->proto = _this->proto; e->channelData = NULL; - e->service = jk_worker_ajp14_service; - e->done = jk_worker_ajp14_done; *eP = e; return JK_TRUE; } +/* + * Serve the request, using AJP13/AJP14 + */ +static int JK_METHOD +jk_worker_ajp14_service(jk_env_t *env, jk_worker_t *w, + jk_ws_service_t *s) +{ + int err; + jk_endpoint_t *e; + + /* Get endpoint from the pool */ + jk_worker_ajp14_getEndpoint( env, w, &e ); + + err=jk_worker_ajp14_service1( env, w, s, e ); + + jk_worker_ajp14_done( env, w, e); + return err; +} + + static int JK_METHOD jk_worker_ajp14_init(jk_env_t *env, jk_worker_t *_this, jk_map_t *props, @@ -597,7 +613,7 @@ { int i; - env->l->jkLog(env, env->l, JK_LOG_DEBUG, + env->l->jkLog(env, env->l, JK_LOG_INFO, "ajp14.destroy()\n"); if( _this->endpointCache != NULL ) { @@ -622,6 +638,38 @@ } _this->pool->close( env, _this->pool ); + + return JK_TRUE; +} + +int JK_METHOD jk_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool, + void **result, + const char *type, const char *name) +{ + jk_worker_t *w=(jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t)); + + if (name == NULL || w == NULL) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "ajp14.factory() NullPointerException\n"); + return JK_FALSE; + } + w->pool = pool; + w->name = NULL; + + w->proto= AJP14_PROTO; + + w->endpointCache= NULL; + w->connect_retry_attempts= AJP_DEF_RETRY_ATTEMPTS; + + w->channel= NULL; + w->secret= NULL; + + w->validate= jk_worker_ajp14_validate; + w->init= jk_worker_ajp14_init; + w->destroy=jk_worker_ajp14_destroy; + w->service = jk_worker_ajp14_service; + + *result = w; return JK_TRUE; }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>