Author: damitha
Date: Sat Jun 14 06:44:21 2008
New Revision: 667798
URL: http://svn.apache.org/viewvc?rev=667798&view=rev
Log:
Making RM 1.0 single channel work without sender threads
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
Sat Jun 14 06:44:21 2008
@@ -142,10 +142,10 @@
const axutil_env_t *env,
axis2_char_t *dbname);
-/*AXIS2_EXTERN axis2_char_t* AXIS2_CALL
-sandesha2_utils_get_svr_side_incoming_seq_id(
- const axutil_env_t *env,
- axis2_char_t *incoming_seq_id);*/
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+sandesha2_utils_get_rmd_seq_id(
+ const axutil_env_t *env,
+ axis2_char_t *internal_sequence_id);
/**
* Caller must free the returned string.
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
Sat Jun 14 06:44:21 2008
@@ -29,7 +29,7 @@
#include <platforms/axutil_platform_auto_sense.h>
#include <ctype.h>
-#define SANDESHA2_MAX_COUNT 8
+#define SANDESHA2_MAX_COUNT 2
/* on_complete callback function */
axis2_status_t AXIS2_CALL
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
Sat Jun 14 06:44:21 2008
@@ -29,7 +29,7 @@
#include <axis2_options.h>
#include <ctype.h>
-#define SANDESHA2_MAX_COUNT 8
+#define SANDESHA2_MAX_COUNT 2
static void
usage(
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
Sat Jun 14 06:44:21 2008
@@ -23,7 +23,7 @@
#include <sandesha2_constants.h>
#include <ctype.h>
-#define SANDESHA2_SLEEP 5
+#define SANDESHA2_SLEEP 2
axiom_node_t *
build_om_programatically(
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
Sat Jun 14 06:44:21 2008
@@ -336,9 +336,12 @@
env, retrans_list, j);
if(retrans_bean)
{
+ axis2_char_t *msg_id = NULL;
+
int msg_type =
sandesha2_sender_bean_get_msg_type(retrans_bean, env);
+ msg_id = sandesha2_sender_bean_get_msg_id(retrans_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Removing the sender bean with type %d",
msg_type);
+ "[sandesha2] Removing the sender bean with type %d and
msg_id:%s", msg_type, msg_id);
sandesha2_sender_mgr_remove(sender_mgr, env,
sandesha2_sender_bean_get_msg_id(retrans_bean, env));
sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env,
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
Sat Jun 14 06:44:21 2008
@@ -49,6 +49,7 @@
#include <sandesha2_terminate_mgr.h>
#include <axis2_const.h>
+#include <axutil_types.h>
#include <axis2_msg_ctx.h>
#include <axutil_string.h>
#include <axis2_engine.h>
@@ -73,6 +74,32 @@
#define SANDESHA2_INTF_TO_IMPL(msg_proc) \
((sandesha2_app_msg_processor_impl_t *)(msg_proc))
+typedef struct sandesha2_app_msg_processor_args
sandesha2_app_msg_processor_args_t;
+
+struct sandesha2_app_msg_processor_args
+{
+ axutil_env_t *env;
+ axis2_conf_ctx_t *conf_ctx;
+ axis2_char_t *internal_sequence_id;
+ axis2_char_t *msg_id;
+ axis2_bool_t is_server_side;
+ int sleep_time;
+};
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_application_msg_worker_function(
+ axutil_thread_t *thd,
+ void *data);
+
+static axis2_status_t
+sandesha2_app_msg_processor_start_application_msg_resender(
+ const axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *internal_sequence_id,
+ axis2_char_t *msg_id,
+ const axis2_bool_t is_server_side,
+ int sleep_time);
+
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_in_msg (
sandesha2_msg_processor_t *msg_processor,
@@ -101,8 +128,7 @@
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_app_msg_response(
const axutil_env_t *env,
- axis2_msg_ctx_t *app_msg_ctx,
- sandesha2_storage_mgr_t *storage_mgr);
+ axis2_msg_ctx_t *app_msg_ctx);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_send_create_seq_msg(
@@ -1657,6 +1683,7 @@
{
AXIS2_FREE(env->allocator, acks_to_str);
}
+ AXIS2_LOG_INFO(env->log, "[Sandesha2]
Exit:sandesha2_app_msg_processor_send_ack_if_reqd");
return AXIS2_SUCCESS;
}
@@ -1875,8 +1902,11 @@
if(axis2_engine_send(engine, env, create_seq_msg))
{
- status = sandesha2_app_msg_processor_process_create_seq_response(env,
create_seq_msg,
- storage_mgr);
+ if(!axis2_msg_ctx_get_server_side(create_seq_msg, env))
+ {
+ status =
sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg,
+ storage_mgr);
+ }
}
else
{
@@ -1895,6 +1925,7 @@
transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
while(!rms_sequence_bean)
{
+ AXIS2_SLEEP(2);
if(transport_sender)
{
/* This is neccessary to avoid a double free */
@@ -1905,12 +1936,15 @@
}
}
- status = sandesha2_app_msg_processor_process_create_seq_response(env,
create_seq_msg,
- storage_mgr);
-
- if(AXIS2_SUCCESS != status)
+ if(!axis2_msg_ctx_get_server_side(create_seq_msg, env))
{
- break;
+ status =
sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg,
+ storage_mgr);
+
+ if(AXIS2_SUCCESS != status)
+ {
+ break;
+ }
}
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr,
env, internal_sequence_id,
@@ -1961,13 +1995,12 @@
create_seq_msg_ctx, soap_ns_uri);
if(!res_envelope)
{
- /* There is no response message context. But in single channel
duplex scenario there should be
- * an CSR in the back channel. So return failure.
+ /* There is no response message context.
*/
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Response
envelope not found");
- return AXIS2_FAILURE;
+ return AXIS2_SUCCESS;
}
}
@@ -2125,12 +2158,13 @@
axis2_msg_ctx_t *app_msg_ctx = NULL;
sandesha2_seq_property_bean_t *to_bean = NULL;
sandesha2_seq_property_bean_t *reply_to_bean = NULL;
- sandesha2_seq_property_bean_t *acks_to_bean = NULL;
+ sandesha2_seq_property_bean_t *from_acks_to_bean = NULL;
sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
axis2_endpoint_ref_t *to_epr = NULL;
axis2_endpoint_ref_t *reply_to_epr = NULL;
- axis2_char_t *acks_to_addr = NULL;
+ axis2_char_t *from_acks_to_addr = NULL;
axis2_char_t *to_addr = NULL;
+ axis2_char_t *reply_to_addr = NULL;
axis2_char_t *new_to_str = NULL;
sandesha2_seq_t *seq = NULL;
sandesha2_seq_t *req_seq = NULL;
@@ -2139,8 +2173,7 @@
sandesha2_msg_number_t *msg_number = NULL;
axis2_msg_ctx_t *req_msg = NULL;
axis2_char_t *str_identifier = NULL;
- sandesha2_sender_bean_t *app_msg_entry = NULL;
- sandesha2_seq_property_bean_t *internal_seq_bean = NULL;
+ sandesha2_sender_bean_t *app_msg_bean = NULL;
long millisecs = 0;
axutil_property_t *property = NULL;
axis2_engine_t *engine = NULL;
@@ -2148,11 +2181,20 @@
axis2_char_t *msg_id = NULL;
axis2_bool_t last_msg = AXIS2_FALSE;
axis2_op_ctx_t *temp_op_ctx = NULL;
- axis2_status_t status = AXIS2_FAILURE;
+ axis2_status_t status = AXIS2_SUCCESS;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_bool_t is_svr_side = AXIS2_FALSE;
axis2_bool_t continue_sending = AXIS2_TRUE;
int msg_type = -1;
+ sandesha2_msg_ctx_t *req_rm_msg_ctx = NULL;
+ axis2_msg_ctx_t *req_msg_ctx = NULL;
+ axis2_op_ctx_t *op_ctx = NULL;
+ axis2_char_t *rmd_sequence_id = NULL;
+ axis2_module_desc_t *module_desc = NULL;
+ axutil_qname_t *qname = NULL;
+ axutil_param_t *sleep_time_param = NULL;
+ int sleep_time = 0;
+ axis2_conf_t *conf = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[Sandesha2] Entry:sandesha2_app_msg_processor_send_app_msg");
@@ -2171,9 +2213,6 @@
reply_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id,
SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
- acks_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id,
- SANDESHA2_SEQ_PROP_ACKS_TO_EPR);
-
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
@@ -2193,27 +2232,13 @@
if(reply_to_bean)
{
- reply_to_epr = axis2_endpoint_ref_create(env,
sandesha2_seq_property_bean_get_value(
- reply_to_bean, env));
+ reply_to_addr = axutil_strdup(env,
sandesha2_seq_property_bean_get_value(reply_to_bean, env));
+ reply_to_epr = axis2_endpoint_ref_create(env, reply_to_addr);
sandesha2_msg_ctx_set_reply_to(rm_msg_ctx, env, reply_to_epr);
sandesha2_seq_property_bean_free(reply_to_bean, env);
}
- if(acks_to_bean)
- {
- axis2_endpoint_ref_t *acks_to_epr = NULL;
-
- acks_to_addr = axutil_strdup(env,
sandesha2_seq_property_bean_get_value(acks_to_bean, env));
- acks_to_epr = axis2_endpoint_ref_create(env, acks_to_addr);
-
- if(acks_to_epr)
- {
- axis2_endpoint_ref_free(acks_to_epr, env);
- }
- sandesha2_seq_property_bean_free(acks_to_bean, env);
- }
-
if(axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
axis2_endpoint_ref_t *reply_to = NULL;
@@ -2258,10 +2283,9 @@
{
AXIS2_FREE(env->allocator, to_addr);
}
-
- if(acks_to_addr)
+ if(reply_to_addr)
{
- AXIS2_FREE(env->allocator, acks_to_addr);
+ AXIS2_FREE(env->allocator, reply_to_addr);
}
return AXIS2_FAILURE;
@@ -2297,10 +2321,9 @@
{
AXIS2_FREE(env->allocator, to_addr);
}
-
- if(acks_to_addr)
+ if(reply_to_addr)
{
- AXIS2_FREE(env->allocator, acks_to_addr);
+ AXIS2_FREE(env->allocator, reply_to_addr);
}
return AXIS2_FAILURE;
@@ -2359,78 +2382,55 @@
/* TODO add_ack_requested */
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
- app_msg_entry = sandesha2_sender_bean_create(env);
- sandesha2_sender_bean_set_internal_seq_id(app_msg_entry, env,
internal_sequence_id);
-
- is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
-
- /*
- * If server side and anonymous acknowledgment in RM 1.0. In other words
this is RM 1.0 replay mode.
- * Note that in this case to_addr is NULL. In duplex mode to_addr cannot
be NULL.
- * */
- if(is_svr_side && sandesha2_utils_is_rm_1_0_anonymous_acks_to(env,
rm_version, acks_to_addr)
- && !to_addr)
+ app_msg_bean = sandesha2_sender_bean_create(env);
+ sandesha2_sender_bean_set_internal_seq_id(app_msg_bean, env,
internal_sequence_id);
+
+ op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
+ if(op_ctx)
{
- axis2_char_t *rmd_sequence_id = NULL;
- sandesha2_msg_ctx_t *req_rm_msg_ctx = NULL;
- axis2_msg_ctx_t *msg_ctx = NULL;
- axis2_msg_ctx_t *req_msg_ctx = NULL;
- axis2_op_ctx_t *op_ctx = NULL;
- sandesha2_seq_t *req_seq = NULL;
-
- msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
- op_ctx = axis2_msg_ctx_get_op_ctx(msg_ctx, env);
req_msg_ctx = axis2_op_ctx_get_msg_ctx(op_ctx, env,
AXIS2_WSDL_MESSAGE_LABEL_IN);
- req_rm_msg_ctx = sandesha2_msg_init_init_msg(env, req_msg_ctx);
- req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg_ctx, env);
- if(!req_seq)
+ if(req_msg_ctx)
{
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence is
NULL");
- AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SEQ_NOT_EXIST,
AXIS2_FAILURE);
- if(rm_version)
- {
- AXIS2_FREE(env->allocator, rm_version);
- }
- if(req_rm_msg_ctx)
- {
- sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
- }
-
- if(acks_to_addr)
- {
- AXIS2_FREE(env->allocator, acks_to_addr);
- }
-
- return AXIS2_FAILURE;
+ req_rm_msg_ctx = sandesha2_msg_init_init_msg(env, req_msg_ctx);
+ req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg_ctx, env);
}
+ }
+ if(req_seq)
+ {
rmd_sequence_id =
sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(req_seq,
- env), env);
- if(!rmd_sequence_id)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence ID
is NULL");
- if(rm_version)
- {
- AXIS2_FREE(env->allocator, rm_version);
- }
- if(req_rm_msg_ctx)
- {
- sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
- }
+ env), env);
+ }
+ if(rmd_sequence_id)
+ {
+ from_acks_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr,
env, rmd_sequence_id,
+ SANDESHA2_SEQ_PROP_ACKS_TO_EPR);
+ }
- if(to_addr)
- {
- AXIS2_FREE(env->allocator, to_addr);
- }
+ if(from_acks_to_bean)
+ {
+ axis2_endpoint_ref_t *from_acks_to_epr = NULL;
- if(acks_to_addr)
- {
- AXIS2_FREE(env->allocator, acks_to_addr);
- }
+ from_acks_to_addr = axutil_strdup(env,
sandesha2_seq_property_bean_get_value(from_acks_to_bean, env));
+ from_acks_to_epr = axis2_endpoint_ref_create(env, from_acks_to_addr);
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "from_acks_to_address:%s",
from_acks_to_addr);
- return AXIS2_FAILURE;
+ if(from_acks_to_epr)
+ {
+ axis2_endpoint_ref_free(from_acks_to_epr, env);
}
+ sandesha2_seq_property_bean_free(from_acks_to_bean, env);
+ }
+
+ is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
+ /*
+ * If server side and anonymous acknowledgment in RM 1.0. In other words
this is RM 1.0 replay mode.
+ * Note that in this case to_addr is NULL. In duplex mode to_addr cannot
be NULL.
+ * */
+ if(is_svr_side && sandesha2_utils_is_rm_1_0_anonymous_acks_to(env,
rm_version, from_acks_to_addr)
+ && !to_addr)
+ {
sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id,
seq_prop_mgr);
if(req_rm_msg_ctx)
{
@@ -2438,7 +2438,7 @@
}
engine = axis2_engine_create(env, conf_ctx);
- status = axis2_engine_resume_send(engine, env, msg_ctx);
+ status = axis2_engine_resume_send(engine, env, app_msg_ctx);
if(engine)
{
axis2_engine_free(engine, env);
@@ -2448,10 +2448,14 @@
{
AXIS2_FREE(env->allocator, to_addr);
}
+ if(reply_to_addr)
+ {
+ AXIS2_FREE(env->allocator, reply_to_addr);
+ }
- if(acks_to_addr)
+ if(from_acks_to_addr)
{
- AXIS2_FREE(env->allocator, acks_to_addr);
+ AXIS2_FREE(env->allocator, from_acks_to_addr);
}
return status;
@@ -2462,29 +2466,21 @@
AXIS2_FREE(env->allocator, to_addr);
}
- sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_entry, env, storage_key);
+ sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_bean, env, storage_key);
millisecs = sandesha2_utils_get_current_time_in_millis(env);
- sandesha2_sender_bean_set_time_to_send(app_msg_entry, env, millisecs);
+ sandesha2_sender_bean_set_time_to_send(app_msg_bean, env, millisecs);
msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
- sandesha2_sender_bean_set_msg_id(app_msg_entry, env, msg_id);
- sandesha2_sender_bean_set_msg_no(app_msg_entry, env, msg_num);
- sandesha2_sender_bean_set_msg_type(app_msg_entry, env,
SANDESHA2_MSG_TYPE_APPLICATION);
-
- internal_seq_bean = sandesha2_seq_property_bean_create_with_data(env,
msg_id,
- SANDESHA2_MSG_CTX_PROP_INTERNAL_SEQUENCE_ID, internal_sequence_id);
-
- if(internal_seq_bean)
- {
- sandesha2_seq_property_mgr_insert(seq_prop_mgr, env,
internal_seq_bean);
- }
+ sandesha2_sender_bean_set_msg_id(app_msg_bean, env, msg_id);
+ sandesha2_sender_bean_set_msg_no(app_msg_bean, env, msg_num);
+ sandesha2_sender_bean_set_msg_type(app_msg_bean, env,
SANDESHA2_MSG_TYPE_APPLICATION);
if(!rms_sequence_bean ||
!sandesha2_seq_property_bean_get_value(rms_sequence_bean, env))
{
- sandesha2_sender_bean_set_send(app_msg_entry, env, AXIS2_FALSE);
+ sandesha2_sender_bean_set_send(app_msg_bean, env, AXIS2_FALSE);
}
else
{
- sandesha2_sender_bean_set_send(app_msg_entry, env, AXIS2_TRUE);
+ sandesha2_sender_bean_set_send(app_msg_bean, env, AXIS2_TRUE);
property = axutil_property_create_with_args(env, 0, 0, 0,
AXIS2_VALUE_TRUE);
axis2_msg_ctx_set_property(app_msg_ctx, env,
SANDESHA2_SET_SEND_TO_TRUE, property);
}
@@ -2492,36 +2488,17 @@
temp_op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
/**
- * In RM one way out only(from client application side) we need to keep the
- * operation context of the application message context marked as in-use.
- * Otherwise when client send the next application message and free the
- * previous op_client this op_ctx is freed.
- */
- /*mep =
axis2_op_get_axis_specific_mep_const(axis2_op_ctx_get_op(temp_op_ctx, env),
env);
- if(AXIS2_MEP_CONSTANT_OUT_ONLY == mep)
- {
- axis2_ctx_t *ctx = axis2_conf_ctx_get_base(conf_ctx, env);
- axutil_property_t *temp_prop = axis2_ctx_get_property(ctx, env,
SANDESHA2_MSG_CTX_MAP);
-
- if(temp_prop)
- {
- axis2_op_ctx_set_in_use(temp_op_ctx, env, AXIS2_TRUE);
- }
- }*/
-
- /**
* When we store application message context as below it should be noted
* that at Sandesha2/C client application side this is actually stored in
* in-memory whereas in the web service side it is actually stored in
- * database only. In RM one way scenario since we call
- * axis2_op_ctx_set_in_use() for the operation context of the application
- * message in few lines above we need to free that operation context in the
- * sandesha2_storage_mgr_remove_msg_ctx() function.
+ * database only.
*/
+ sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_bean);
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key,
app_msg_ctx);
- continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
app_msg_entry,
+ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
app_msg_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Do not continue
sending the message");
@@ -2529,21 +2506,23 @@
{
AXIS2_FREE(env->allocator, rm_version);
}
- if(app_msg_entry)
+ if(app_msg_bean)
{
- sandesha2_sender_bean_free(app_msg_entry, env);
+ sandesha2_sender_bean_free(app_msg_bean, env);
}
- if(acks_to_addr)
+ if(reply_to_addr)
{
- AXIS2_FREE(env->allocator, acks_to_addr);
+ AXIS2_FREE(env->allocator, reply_to_addr);
+ }
+ if(from_acks_to_addr)
+ {
+ AXIS2_FREE(env->allocator, from_acks_to_addr);
}
return AXIS2_FAILURE;
}
-
- sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_entry);
-
+
axis2_msg_ctx_set_current_handler_index(app_msg_ctx, env,
axis2_msg_ctx_get_current_handler_index(app_msg_ctx, env) + 1);
@@ -2558,11 +2537,9 @@
engine = axis2_engine_create(env, conf_ctx);
if(axis2_engine_resume_send(engine, env, app_msg_ctx))
{
-
if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
- status = sandesha2_app_msg_processor_process_app_msg_response(env,
app_msg_ctx,
- storage_mgr);
+ status = sandesha2_app_msg_processor_process_app_msg_response(env,
app_msg_ctx);
}
}
else
@@ -2575,42 +2552,99 @@
axis2_engine_free(engine, env);
}
+ conf = axis2_conf_ctx_get_conf(conf_ctx, env);
+ qname = axutil_qname_create(env, SANDESHA2_MODULE, NULL, NULL);
+ module_desc = axis2_conf_get_module(conf, env, qname);
+ sleep_time_param = axis2_module_desc_get_param(module_desc, env,
SANDESHA2_SENDER_SLEEP);
+ if(sleep_time_param)
+ {
+ sleep_time = AXIS2_ATOI(axutil_param_get_value(sleep_time_param, env));
+ }
+ if(qname)
+ {
+ axutil_qname_free(qname, env);
+ }
- if(sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version,
acks_to_addr))
+ if(!is_svr_side && (!reply_to_addr ||
sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
{
/* If application client side and single channel, resend is done in
the same
* thread as the application client.
*/
+ axis2_transport_out_desc_t *transport_out = NULL;
+ axis2_transport_sender_t *transport_sender = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
sender_bean =
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
{
- /* Application message is acknowledged. No need to resend. */
+ /* There is no pending message to send. */
status = AXIS2_SUCCESS;
}
- else /* sender_bean is not NULL */
+ else
{
+ sandesha2_sender_bean_free(sender_bean, env);
+ transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx,
env);
+ if(transport_out)
+ {
+ transport_sender =
axis2_transport_out_desc_get_sender(transport_out, env);
+ }
+ if(transport_sender)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Transport sender could not be retrieved
from transport_out");
+ status = AXIS2_FAILURE;
+ }
+
while(AXIS2_TRUE)
{
- AXIS2_SLEEP(4);
- status = sandesha2_app_msg_processor_resend(env, conf_ctx,
msg_id, AXIS2_FALSE,
- internal_sequence_id, storage_mgr, seq_prop_mgr,
create_seq_mgr,
- sender_mgr);
+ continue_sending =
sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_bean,
+ conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr,
sender_mgr);
+ if(!continue_sending)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Do not continue sending the
application message");
+ break;
+ }
+
+ AXIS2_SLEEP(sleep_time);
+ if(transport_sender)
+ {
+ /* This is neccessary to avoid a double free */
+ axis2_msg_ctx_set_property(app_msg_ctx, env,
AXIS2_TRANSPORT_IN, NULL);
+ if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env,
app_msg_ctx))
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Transport sender invoke failed in
sending application message");
+ }
+ }
- if(AXIS2_SUCCESS != status)
+ if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
- "[sandesha2] Resend failed for message id %s in
sequence %s", msg_id,
- internal_sequence_id);
+ status =
sandesha2_app_msg_processor_process_app_msg_response(env, app_msg_ctx);
+
+ if(AXIS2_SUCCESS != status)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Resend failed for message id %s in
sequence %s", msg_id,
+ internal_sequence_id);
+ break;
+ }
+ }
+
+ sender_bean =
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env,
+ internal_sequence_id, msg_id);
+ if(!sender_bean)
+ {
+ /* There is no pending message to send. So exit from the
loop. */
break;
}
-
+ else
+ {
+ sandesha2_sender_bean_free(sender_bean, env);
+ }
}
-
- sandesha2_sender_bean_free(sender_bean, env);
}
if(rm_version)
@@ -2618,9 +2652,13 @@
AXIS2_FREE(env->allocator, rm_version);
}
- if(acks_to_addr)
+ if(reply_to_addr)
{
- AXIS2_FREE(env->allocator, acks_to_addr);
+ AXIS2_FREE(env->allocator, reply_to_addr);
+ }
+ if(from_acks_to_addr)
+ {
+ AXIS2_FREE(env->allocator, from_acks_to_addr);
}
return status;
@@ -2631,57 +2669,22 @@
AXIS2_FREE(env->allocator, rm_version);
}
- if(acks_to_addr)
+ if(reply_to_addr)
{
- AXIS2_FREE(env->allocator, acks_to_addr);
+ AXIS2_FREE(env->allocator, reply_to_addr);
}
- /* If not (single channel) resend in a thread */
- while(AXIS2_TRUE)
+ if(from_acks_to_addr)
{
- sandesha2_sender_bean_t *sender_bean = NULL;
- axis2_char_t *temp_msg_id = NULL;
-
- sender_bean =
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env,
- internal_sequence_id, NULL);
- if(!sender_bean)
- {
- /* There is no pending message to send. So exit from the loop. */
- break;
- }
- else
- {
- temp_msg_id = sandesha2_sender_bean_get_msg_id(sender_bean, env);
- if(!axutil_strcmp(temp_msg_id, msg_id))
- {
- if(sender_bean)
- {
- sandesha2_sender_bean_free(sender_bean, env);
- }
-
- continue;
- }
- }
-
- status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id,
is_svr_side,
- internal_sequence_id, storage_mgr, seq_prop_mgr,
create_seq_mgr,
- sender_mgr);
-
- if(AXIS2_SUCCESS != status)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
- "[sandesha2] Resend failed for message id %s in sequence %s",
msg_id,
- internal_sequence_id);
-
- break;
- }
-
- if(sender_bean)
- {
- sandesha2_sender_bean_free(sender_bean, env);
- }
+ AXIS2_FREE(env->allocator, from_acks_to_addr);
}
+ /* If not (single channel) spawn a thread and see whether acknowledgment
has arrived through the
+ * sandesha2_sender_mgr_get_next_application_msg_to_send() function. If it
has arrived exit from
+ * the thread.*/
+ sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx,
internal_sequence_id,
+ msg_id, is_svr_side, sleep_time);
+
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[Sandesha2]
Exit:sandesha2_app_msg_processor_send_app_msg");
return status;
@@ -2812,11 +2815,11 @@
if(!axis2_msg_ctx_get_server_side(msg_ctx, env))
{
- status = sandesha2_app_msg_processor_process_app_msg_response(env,
msg_ctx, storage_mgr);
+ /*status =
sandesha2_app_msg_processor_process_app_msg_response(env, msg_ctx, storage_mgr);
if(AXIS2_SUCCESS != status)
{
return status;
- }
+ }*/
}
}
@@ -2838,8 +2841,7 @@
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_app_msg_response(
const axutil_env_t *env,
- axis2_msg_ctx_t *msg_ctx,
- sandesha2_storage_mgr_t *storage_mgr)
+ axis2_msg_ctx_t *msg_ctx)
{
axutil_property_t *property = NULL;
axis2_msg_ctx_t *res_msg_ctx = NULL;
@@ -3102,3 +3104,122 @@
return AXIS2_SUCCESS;
}
+static axis2_status_t
+sandesha2_app_msg_processor_start_application_msg_resender(
+ const axutil_env_t *env,
+ axis2_conf_ctx_t *conf_ctx,
+ axis2_char_t *internal_sequence_id,
+ axis2_char_t *msg_id,
+ const axis2_bool_t is_server_side,
+ int sleep_time)
+{
+ axutil_thread_t *worker_thread = NULL;
+ sandesha2_app_msg_processor_args_t *args = NULL;
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]
Entry:sandesha2_app_msg_processor_start_application_msg_resender");
+
+ axutil_allocator_switch_to_global_pool(env->allocator);
+ args = AXIS2_MALLOC(env->allocator,
sizeof(sandesha2_app_msg_processor_args_t));
+ args->env = axutil_init_thread_env(env);
+ axutil_allocator_switch_to_local_pool(env->allocator);
+ args->conf_ctx = conf_ctx;
+ args->internal_sequence_id = internal_sequence_id;
+ args->msg_id = msg_id;
+ args->sleep_time = sleep_time;
+ args->is_server_side = is_server_side;
+ ++(env->allocator->ref_pool_allocator);
+
+ worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
+ sandesha2_app_msg_processor_application_msg_worker_function,
(void*)args);
+ if(!worker_thread)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Thread creation failed for
sandesha2_app_msg_processor_start_application_msg_resender");
+ return AXIS2_FAILURE;
+ }
+
+ axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]
Exit:sandesha2_app_msg_processor_start_application_msg_resender");
+ return AXIS2_SUCCESS;
+}
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_application_msg_worker_function(
+ axutil_thread_t *thd,
+ void *data)
+{
+ sandesha2_app_msg_processor_args_t *args;
+ axutil_env_t *env = NULL;
+ sandesha2_storage_mgr_t *storage_mgr = NULL;
+ sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
+ sandesha2_sender_mgr_t *sender_mgr = NULL;
+ int sleep_time = 0;
+ axis2_char_t *dbname = NULL;
+ axis2_conf_ctx_t *conf_ctx = NULL;
+ axis2_char_t *internal_sequence_id = NULL;
+ axis2_bool_t is_server_side = AXIS2_FALSE;
+ sandesha2_sender_bean_t *sender_bean = NULL;
+ axis2_char_t *msg_id = NULL;
+ axis2_status_t status = AXIS2_FAILURE;
+
+ args = (sandesha2_app_msg_processor_args_t*) data;
+ env = args->env;
+ axutil_allocator_switch_to_global_pool(env->allocator);
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]
Entry:sandesha2_app_msg_processor_application_msg_worker_function");
+ conf_ctx = args->conf_ctx;
+ msg_id = args->msg_id;
+ internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
+ is_server_side = args->is_server_side;
+ sleep_time = args->sleep_time;
+ dbname = sandesha2_util_get_dbname(env, conf_ctx);
+ storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
+ seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
+ sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
+
+
+ while(AXIS2_TRUE)
+ {
+ AXIS2_SLEEP(sleep_time);
+ sender_bean =
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env,
+ internal_sequence_id, msg_id);
+ if(!sender_bean)
+ {
+ /* There is no pending message to send. So exit from the thread. */
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] There is no pending message to send. So exit
from the thread");
+ break;
+ }
+
+ status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id,
is_server_side,
+ internal_sequence_id, storage_mgr, seq_prop_mgr, NULL,
+ sender_mgr);
+
+ if(AXIS2_SUCCESS != status)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
+ "[sandesha2] Resend failed for message id %s in sequence %s",
msg_id,
+ internal_sequence_id);
+
+ if(sender_bean)
+ {
+ sandesha2_sender_bean_free(sender_bean, env);
+ }
+ break;
+ }
+
+ if(sender_bean)
+ {
+ sandesha2_sender_bean_free(sender_bean, env);
+ }
+ }
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+ "[sandesha2]
Entry:sandesha2_app_msg_processor_application_msg_worker_function");
+
+ return NULL;
+}
+
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
Sat Jun 14 06:44:21 2008
@@ -432,25 +432,25 @@
return storage_mgr;
}
-/*AXIS2_EXTERN axis2_char_t* AXIS2_CALL
-sandesha2_utils_get_svr_side_incoming_seq_id(
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL
+sandesha2_utils_get_rmd_seq_id(
const axutil_env_t *env,
- axis2_char_t *incoming_seq_id)
+ axis2_char_t *internal_sequence_id)
{
axis2_char_t *start_str = NULL;
axis2_char_t *ret = NULL;
int start_len = 0;
- AXIS2_PARAM_CHECK(env->error, incoming_seq_id, NULL);
+ AXIS2_PARAM_CHECK(env->error, internal_sequence_id, NULL);
start_str = axutil_strcat(env, SANDESHA2_INTERNAL_SEQ_PREFIX, ":", NULL);
start_len = axutil_strlen(start_str);
- if(0 != axutil_strncmp(incoming_seq_id, start_str, start_len))
+ if(0 != axutil_strncmp(internal_sequence_id, start_str, start_len))
return NULL;
- ret = axutil_strdup(env, (incoming_seq_id + start_len *
sizeof(axis2_char_t)));
+ ret = axutil_strdup(env, (internal_sequence_id + start_len *
sizeof(axis2_char_t)));
return ret;
-}*/
+}
AXIS2_EXTERN sandesha2_property_bean_t* AXIS2_CALL
Modified:
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
URL:
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
---
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
(original)
+++
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
Sat Jun 14 06:44:21 2008
@@ -423,6 +423,7 @@
}
reply_to_epr = axis2_msg_ctx_get_to(req_msg_ctx, env);
+ acks_to_str = (axis2_char_t *)
axis2_endpoint_ref_get_address(reply_to_epr, env);
if(reply_to_epr)
{
@@ -438,7 +439,7 @@
return AXIS2_FAILURE;
}
}
- else
+ else /* Not server side */
{
reply_to_epr = axis2_msg_ctx_get_reply_to(first_app_msg, env);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]