Author: damitha
Date: Sat Jun 14 06:44:21 2008
New Revision: 667798

URL: http://svn.apache.org/viewvc?rev=667798&view=rev
Log:
Making RM 1.0 single channel work without sender threads

Modified:
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
    
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_utils.h
 Sat Jun 14 06:44:21 2008
@@ -142,10 +142,10 @@
     const axutil_env_t *env,
     axis2_char_t *dbname);
 
-/*AXIS2_EXTERN axis2_char_t* AXIS2_CALL                       
-sandesha2_utils_get_svr_side_incoming_seq_id(
-    const axutil_env_t *env,
-    axis2_char_t *incoming_seq_id);*/
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL                       
+sandesha2_utils_get_rmd_seq_id(
+        const axutil_env_t *env,
+        axis2_char_t *internal_sequence_id);
 
 /**
  * Caller must free the returned string.

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_1_0/rm_echo.c
 Sat Jun 14 06:44:21 2008
@@ -29,7 +29,7 @@
 #include <platforms/axutil_platform_auto_sense.h>
 #include <ctype.h>
 
-#define SANDESHA2_MAX_COUNT 8
+#define SANDESHA2_MAX_COUNT 2
 
 /* on_complete callback function */
 axis2_status_t AXIS2_CALL

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_echo_single_1_0/rm_echo_single.c
 Sat Jun 14 06:44:21 2008
@@ -29,7 +29,7 @@
 #include <axis2_options.h>
 #include <ctype.h>
 
-#define SANDESHA2_MAX_COUNT 8
+#define SANDESHA2_MAX_COUNT 2
 
 static void 
 usage(

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/samples/rm_ping_1_0/rm_ping_1_0.c
 Sat Jun 14 06:44:21 2008
@@ -23,7 +23,7 @@
 #include <sandesha2_constants.h>
 #include <ctype.h>
 
-#define SANDESHA2_SLEEP 5
+#define SANDESHA2_SLEEP 2
 
 axiom_node_t *
 build_om_programatically(

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/ack_msg_processor.c
 Sat Jun 14 06:44:21 2008
@@ -336,9 +336,12 @@
                 env, retrans_list, j);
             if(retrans_bean)
             {
+                axis2_char_t *msg_id = NULL;
+
                 int msg_type = 
sandesha2_sender_bean_get_msg_type(retrans_bean, env);
+                msg_id = sandesha2_sender_bean_get_msg_id(retrans_bean, env);
                 AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                    "[sandesha2] Removing the sender bean with type %d", 
msg_type);
+                    "[sandesha2] Removing the sender bean with type %d and 
msg_id:%s", msg_type, msg_id);
                 sandesha2_sender_mgr_remove(sender_mgr, env, 
                     sandesha2_sender_bean_get_msg_id(retrans_bean, env));
                 sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env,

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
 Sat Jun 14 06:44:21 2008
@@ -49,6 +49,7 @@
 #include <sandesha2_terminate_mgr.h>
 
 #include <axis2_const.h>
+#include <axutil_types.h>
 #include <axis2_msg_ctx.h>
 #include <axutil_string.h>
 #include <axis2_engine.h>
@@ -73,6 +74,32 @@
 #define SANDESHA2_INTF_TO_IMPL(msg_proc) \
                                                
((sandesha2_app_msg_processor_impl_t *)(msg_proc))
 
+typedef struct sandesha2_app_msg_processor_args 
sandesha2_app_msg_processor_args_t;
+
+struct sandesha2_app_msg_processor_args
+{
+    axutil_env_t *env;
+    axis2_conf_ctx_t *conf_ctx;
+    axis2_char_t *internal_sequence_id;
+    axis2_char_t *msg_id;
+    axis2_bool_t is_server_side;
+    int sleep_time;
+};
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_application_msg_worker_function(
+    axutil_thread_t *thd, 
+    void *data);
+
+static axis2_status_t
+sandesha2_app_msg_processor_start_application_msg_resender(
+    const axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *internal_sequence_id,
+    axis2_char_t *msg_id,
+    const axis2_bool_t is_server_side,
+    int sleep_time);
+
 static axis2_status_t AXIS2_CALL 
 sandesha2_app_msg_processor_process_in_msg (
     sandesha2_msg_processor_t *msg_processor,
@@ -101,8 +128,7 @@
 static axis2_status_t AXIS2_CALL
 sandesha2_app_msg_processor_process_app_msg_response(
     const axutil_env_t *env, 
-    axis2_msg_ctx_t *app_msg_ctx,
-    sandesha2_storage_mgr_t *storage_mgr);
+    axis2_msg_ctx_t *app_msg_ctx);
 
 static axis2_status_t AXIS2_CALL
 sandesha2_app_msg_processor_send_create_seq_msg(
@@ -1657,6 +1683,7 @@
         {
             AXIS2_FREE(env->allocator, acks_to_str);
         }
+        AXIS2_LOG_INFO(env->log, "[Sandesha2] 
Exit:sandesha2_app_msg_processor_send_ack_if_reqd");
         return AXIS2_SUCCESS;
     } 
 
@@ -1875,8 +1902,11 @@
 
     if(axis2_engine_send(engine, env, create_seq_msg))
     {
-        status = sandesha2_app_msg_processor_process_create_seq_response(env, 
create_seq_msg, 
-                storage_mgr);
+        if(!axis2_msg_ctx_get_server_side(create_seq_msg, env))
+        {
+            status = 
sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg, 
+                    storage_mgr);
+        }
     }
     else
     {
@@ -1895,6 +1925,7 @@
     transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
     while(!rms_sequence_bean)
     {
+        AXIS2_SLEEP(2);
         if(transport_sender)
         {
             /* This is neccessary to avoid a double free */
@@ -1905,12 +1936,15 @@
             }
         }
 
-        status = sandesha2_app_msg_processor_process_create_seq_response(env, 
create_seq_msg, 
-            storage_mgr);
-    
-        if(AXIS2_SUCCESS != status)
+        if(!axis2_msg_ctx_get_server_side(create_seq_msg, env))
         {
-            break;
+            status = 
sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg, 
+                storage_mgr);
+    
+            if(AXIS2_SUCCESS != status)
+            {
+                break;
+            }
         }
 
         rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, 
env, internal_sequence_id, 
@@ -1961,13 +1995,12 @@
                 create_seq_msg_ctx, soap_ns_uri);
         if(!res_envelope)
         {
-            /* There is no response message context. But in single channel 
duplex scenario there should be
-             * an CSR in the back channel. So return failure.
+            /* There is no response message context.
              */
 
             AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Response 
envelope not found");
 
-            return AXIS2_FAILURE;
+            return AXIS2_SUCCESS;
         }
     }
 
@@ -2125,12 +2158,13 @@
     axis2_msg_ctx_t *app_msg_ctx = NULL;
     sandesha2_seq_property_bean_t *to_bean = NULL;
     sandesha2_seq_property_bean_t *reply_to_bean = NULL;
-    sandesha2_seq_property_bean_t *acks_to_bean = NULL;
+    sandesha2_seq_property_bean_t *from_acks_to_bean = NULL;
     sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
     axis2_endpoint_ref_t *to_epr = NULL;
     axis2_endpoint_ref_t *reply_to_epr = NULL;
-    axis2_char_t *acks_to_addr = NULL;
+    axis2_char_t *from_acks_to_addr = NULL;
     axis2_char_t *to_addr = NULL;
+    axis2_char_t *reply_to_addr = NULL;
     axis2_char_t *new_to_str = NULL;
     sandesha2_seq_t *seq = NULL;
     sandesha2_seq_t *req_seq = NULL;
@@ -2139,8 +2173,7 @@
     sandesha2_msg_number_t *msg_number = NULL;
     axis2_msg_ctx_t *req_msg = NULL;
     axis2_char_t *str_identifier = NULL;
-    sandesha2_sender_bean_t *app_msg_entry = NULL;
-    sandesha2_seq_property_bean_t *internal_seq_bean = NULL;
+    sandesha2_sender_bean_t *app_msg_bean = NULL;
     long millisecs = 0;
     axutil_property_t *property = NULL;
     axis2_engine_t *engine = NULL;
@@ -2148,11 +2181,20 @@
     axis2_char_t *msg_id = NULL;
     axis2_bool_t last_msg = AXIS2_FALSE;
     axis2_op_ctx_t *temp_op_ctx = NULL;
-    axis2_status_t status = AXIS2_FAILURE;
+    axis2_status_t status = AXIS2_SUCCESS;
     axis2_conf_ctx_t *conf_ctx = NULL;
     axis2_bool_t is_svr_side = AXIS2_FALSE;
     axis2_bool_t continue_sending = AXIS2_TRUE;
     int msg_type = -1;
+    sandesha2_msg_ctx_t *req_rm_msg_ctx = NULL;
+    axis2_msg_ctx_t *req_msg_ctx = NULL;
+    axis2_op_ctx_t *op_ctx = NULL;
+    axis2_char_t *rmd_sequence_id = NULL;
+    axis2_module_desc_t *module_desc = NULL;
+    axutil_qname_t *qname = NULL;
+    axutil_param_t *sleep_time_param = NULL;
+    int sleep_time = 0;
+    axis2_conf_t *conf = NULL;
 
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,   
         "[Sandesha2] Entry:sandesha2_app_msg_processor_send_app_msg");
@@ -2171,9 +2213,6 @@
     reply_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
internal_sequence_id, 
             SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
     
-    acks_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
internal_sequence_id, 
-            SANDESHA2_SEQ_PROP_ACKS_TO_EPR);
-
     rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, 
             internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
 
@@ -2193,27 +2232,13 @@
     
     if(reply_to_bean)
     {
-        reply_to_epr = axis2_endpoint_ref_create(env, 
sandesha2_seq_property_bean_get_value(
-                    reply_to_bean, env));
+        reply_to_addr = axutil_strdup(env, 
sandesha2_seq_property_bean_get_value(reply_to_bean, env));
+        reply_to_epr = axis2_endpoint_ref_create(env, reply_to_addr);
         sandesha2_msg_ctx_set_reply_to(rm_msg_ctx, env, reply_to_epr);
 
         sandesha2_seq_property_bean_free(reply_to_bean, env);
     }
     
-    if(acks_to_bean)
-    {
-        axis2_endpoint_ref_t *acks_to_epr = NULL;
-
-        acks_to_addr = axutil_strdup(env, 
sandesha2_seq_property_bean_get_value(acks_to_bean, env));
-        acks_to_epr = axis2_endpoint_ref_create(env, acks_to_addr);
-
-        if(acks_to_epr)
-        {
-            axis2_endpoint_ref_free(acks_to_epr, env);
-        }
-        sandesha2_seq_property_bean_free(acks_to_bean, env);
-    }
-    
     if(axis2_msg_ctx_get_server_side(app_msg_ctx, env))
     {
         axis2_endpoint_ref_t *reply_to = NULL;
@@ -2258,10 +2283,9 @@
         {
             AXIS2_FREE(env->allocator, to_addr);
         }
-
-        if(acks_to_addr)
+        if(reply_to_addr)
         {
-            AXIS2_FREE(env->allocator, acks_to_addr);
+            AXIS2_FREE(env->allocator, reply_to_addr);
         }
 
         return AXIS2_FAILURE;
@@ -2297,10 +2321,9 @@
             {
                 AXIS2_FREE(env->allocator, to_addr);
             }
-
-            if(acks_to_addr)
+            if(reply_to_addr)
             {
-                AXIS2_FREE(env->allocator, acks_to_addr);
+                AXIS2_FREE(env->allocator, reply_to_addr);
             }
 
             return AXIS2_FAILURE;
@@ -2359,78 +2382,55 @@
     /* TODO add_ack_requested */
 
     sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
-    app_msg_entry = sandesha2_sender_bean_create(env);
-    sandesha2_sender_bean_set_internal_seq_id(app_msg_entry, env, 
internal_sequence_id);
-
-    is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
-
-    /* 
-     * If server side and anonymous acknowledgment in RM 1.0. In other words 
this is RM 1.0 replay mode.
-     * Note that in this case to_addr is NULL. In duplex mode to_addr cannot 
be NULL. 
-     * */
-    if(is_svr_side && sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, 
rm_version, acks_to_addr) 
-            && !to_addr)
+    app_msg_bean = sandesha2_sender_bean_create(env);
+    sandesha2_sender_bean_set_internal_seq_id(app_msg_bean, env, 
internal_sequence_id);
+   
+    op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
+    if(op_ctx)
     {
-        axis2_char_t *rmd_sequence_id = NULL;
-        sandesha2_msg_ctx_t *req_rm_msg_ctx = NULL;
-        axis2_msg_ctx_t *msg_ctx = NULL;
-        axis2_msg_ctx_t *req_msg_ctx = NULL;
-        axis2_op_ctx_t *op_ctx = NULL;
-        sandesha2_seq_t *req_seq = NULL;
-       
-        msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
-        op_ctx = axis2_msg_ctx_get_op_ctx(msg_ctx, env);
         req_msg_ctx =  axis2_op_ctx_get_msg_ctx(op_ctx, env, 
AXIS2_WSDL_MESSAGE_LABEL_IN);
-        req_rm_msg_ctx = sandesha2_msg_init_init_msg(env, req_msg_ctx);
-        req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg_ctx, env);
-        if(!req_seq)
+        if(req_msg_ctx)
         {
-            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence is 
NULL");
-            AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SEQ_NOT_EXIST, 
AXIS2_FAILURE);
-            if(rm_version)
-            {
-                AXIS2_FREE(env->allocator, rm_version);
-            }
-            if(req_rm_msg_ctx)
-            {
-                sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
-            }
-
-            if(acks_to_addr)
-            {
-                AXIS2_FREE(env->allocator, acks_to_addr);
-            }
-            
-            return AXIS2_FAILURE;
+            req_rm_msg_ctx = sandesha2_msg_init_init_msg(env, req_msg_ctx);
+            req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg_ctx, env);
         }
+    }
 
+    if(req_seq)
+    {
         rmd_sequence_id = 
sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(req_seq, 
-                    env), env);
-        if(!rmd_sequence_id)
-        {
-            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence ID 
is NULL");
-            if(rm_version)
-            {
-                AXIS2_FREE(env->allocator, rm_version);
-            }
-            if(req_rm_msg_ctx)
-            {
-                sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
-            }
+                env), env);
+    }
+    if(rmd_sequence_id)
+    {
+        from_acks_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, 
env, rmd_sequence_id, 
+            SANDESHA2_SEQ_PROP_ACKS_TO_EPR);
+    }
 
-            if(to_addr)
-            {
-                AXIS2_FREE(env->allocator, to_addr);
-            }
+    if(from_acks_to_bean)
+    {
+        axis2_endpoint_ref_t *from_acks_to_epr = NULL;
 
-            if(acks_to_addr)
-            {
-                AXIS2_FREE(env->allocator, acks_to_addr);
-            }
+        from_acks_to_addr = axutil_strdup(env, 
sandesha2_seq_property_bean_get_value(from_acks_to_bean, env));
+        from_acks_to_epr = axis2_endpoint_ref_create(env, from_acks_to_addr);
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "from_acks_to_address:%s", 
from_acks_to_addr);
 
-            return AXIS2_FAILURE;
+        if(from_acks_to_epr)
+        {
+            axis2_endpoint_ref_free(from_acks_to_epr, env);
         }
+        sandesha2_seq_property_bean_free(from_acks_to_bean, env);
+    }
+
+    is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
 
+    /* 
+     * If server side and anonymous acknowledgment in RM 1.0. In other words 
this is RM 1.0 replay mode.
+     * Note that in this case to_addr is NULL. In duplex mode to_addr cannot 
be NULL. 
+     * */
+    if(is_svr_side && sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, 
rm_version, from_acks_to_addr) 
+            && !to_addr)
+    {
         sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id, 
seq_prop_mgr);
         if(req_rm_msg_ctx)
         {
@@ -2438,7 +2438,7 @@
         }
 
         engine = axis2_engine_create(env, conf_ctx);
-        status = axis2_engine_resume_send(engine, env, msg_ctx);
+        status = axis2_engine_resume_send(engine, env, app_msg_ctx);
         if(engine)
         {
             axis2_engine_free(engine, env);
@@ -2448,10 +2448,14 @@
         {
             AXIS2_FREE(env->allocator, to_addr);
         }
+        if(reply_to_addr)
+        {
+            AXIS2_FREE(env->allocator, reply_to_addr);
+        }
 
-        if(acks_to_addr)
+        if(from_acks_to_addr)
         {
-            AXIS2_FREE(env->allocator, acks_to_addr);
+            AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
         
         return status;
@@ -2462,29 +2466,21 @@
         AXIS2_FREE(env->allocator, to_addr);
     }
 
-    sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_entry, env, storage_key);
+    sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_bean, env, storage_key);
     millisecs = sandesha2_utils_get_current_time_in_millis(env);
-    sandesha2_sender_bean_set_time_to_send(app_msg_entry, env, millisecs);
+    sandesha2_sender_bean_set_time_to_send(app_msg_bean, env, millisecs);
     msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
-    sandesha2_sender_bean_set_msg_id(app_msg_entry, env, msg_id);
-    sandesha2_sender_bean_set_msg_no(app_msg_entry, env, msg_num);
-    sandesha2_sender_bean_set_msg_type(app_msg_entry, env, 
SANDESHA2_MSG_TYPE_APPLICATION);
-
-    internal_seq_bean = sandesha2_seq_property_bean_create_with_data(env, 
msg_id, 
-        SANDESHA2_MSG_CTX_PROP_INTERNAL_SEQUENCE_ID, internal_sequence_id);
-
-    if(internal_seq_bean)
-    {
-        sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, 
internal_seq_bean);
-    }
+    sandesha2_sender_bean_set_msg_id(app_msg_bean, env, msg_id);
+    sandesha2_sender_bean_set_msg_no(app_msg_bean, env, msg_num);
+    sandesha2_sender_bean_set_msg_type(app_msg_bean, env, 
SANDESHA2_MSG_TYPE_APPLICATION);
 
     if(!rms_sequence_bean || 
!sandesha2_seq_property_bean_get_value(rms_sequence_bean, env))
     {
-        sandesha2_sender_bean_set_send(app_msg_entry, env, AXIS2_FALSE);
+        sandesha2_sender_bean_set_send(app_msg_bean, env, AXIS2_FALSE);
     }
     else
     {
-        sandesha2_sender_bean_set_send(app_msg_entry, env, AXIS2_TRUE);
+        sandesha2_sender_bean_set_send(app_msg_bean, env, AXIS2_TRUE);
         property = axutil_property_create_with_args(env, 0, 0, 0, 
AXIS2_VALUE_TRUE);
         axis2_msg_ctx_set_property(app_msg_ctx, env, 
SANDESHA2_SET_SEND_TO_TRUE, property);
     }
@@ -2492,36 +2488,17 @@
     temp_op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
 
     /**
-     * In RM one way out only(from client application side) we need to keep the
-     * operation context of the application message context marked as in-use.
-     * Otherwise when client send the next application message and free the
-     * previous op_client this op_ctx is freed.
-     */
-    /*mep = 
axis2_op_get_axis_specific_mep_const(axis2_op_ctx_get_op(temp_op_ctx, env), 
env);
-    if(AXIS2_MEP_CONSTANT_OUT_ONLY == mep)
-    {
-        axis2_ctx_t *ctx = axis2_conf_ctx_get_base(conf_ctx, env);
-        axutil_property_t *temp_prop = axis2_ctx_get_property(ctx, env, 
SANDESHA2_MSG_CTX_MAP);
-
-        if(temp_prop)
-        {
-            axis2_op_ctx_set_in_use(temp_op_ctx, env, AXIS2_TRUE);
-        }
-    }*/
-
-    /**
      * When we store application message context as below it should be noted
      * that at Sandesha2/C client application side this is actually stored in
      * in-memory whereas in the web service side it is actually stored in
-     * database only. In RM one way scenario since we call
-     * axis2_op_ctx_set_in_use() for the operation context of the application
-     * message in few lines above we need to free that operation context in the
-     * sandesha2_storage_mgr_remove_msg_ctx() function.
+     * database only.
      */
+    sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_bean);
     sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, 
app_msg_ctx);
 
-    continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, 
app_msg_entry, 
+    continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, 
app_msg_bean, 
             conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
+
     if(!continue_sending)
     {
         AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Do not continue 
sending the message");
@@ -2529,21 +2506,23 @@
         {
             AXIS2_FREE(env->allocator, rm_version);
         }
-        if(app_msg_entry)
+        if(app_msg_bean)
         {
-            sandesha2_sender_bean_free(app_msg_entry, env);
+            sandesha2_sender_bean_free(app_msg_bean, env);
         }
 
-        if(acks_to_addr)
+        if(reply_to_addr)
         {
-            AXIS2_FREE(env->allocator, acks_to_addr);
+            AXIS2_FREE(env->allocator, reply_to_addr);
+        }
+        if(from_acks_to_addr)
+        {
+            AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
 
         return AXIS2_FAILURE;
     }
-
-    sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_entry);
-    
+ 
     axis2_msg_ctx_set_current_handler_index(app_msg_ctx, env, 
             axis2_msg_ctx_get_current_handler_index(app_msg_ctx, env) + 1);
 
@@ -2558,11 +2537,9 @@
     engine = axis2_engine_create(env, conf_ctx);
     if(axis2_engine_resume_send(engine, env, app_msg_ctx))
     {
-
         if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
         {    
-            status = sandesha2_app_msg_processor_process_app_msg_response(env, 
app_msg_ctx, 
-                storage_mgr);
+            status = sandesha2_app_msg_processor_process_app_msg_response(env, 
app_msg_ctx);
         }
     }
     else
@@ -2575,42 +2552,99 @@
         axis2_engine_free(engine, env);
     }
 
+    conf = axis2_conf_ctx_get_conf(conf_ctx, env);
+    qname = axutil_qname_create(env, SANDESHA2_MODULE, NULL, NULL);
+    module_desc = axis2_conf_get_module(conf, env, qname);
+    sleep_time_param = axis2_module_desc_get_param(module_desc, env, 
SANDESHA2_SENDER_SLEEP);
+    if(sleep_time_param)
+    {
+        sleep_time = AXIS2_ATOI(axutil_param_get_value(sleep_time_param, env));
+    }
+    if(qname)
+    {
+        axutil_qname_free(qname, env);
+    }
 
-    if(sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, 
acks_to_addr))
+    if(!is_svr_side && (!reply_to_addr || 
sandesha2_utils_is_rm_1_0_anonymous_acks_to(env, rm_version, reply_to_addr)))
     {
         /* If application client side and single channel, resend is done in 
the same 
          * thread as the application client.
          */
+        axis2_transport_out_desc_t *transport_out = NULL;
+        axis2_transport_sender_t *transport_sender = NULL;
         sandesha2_sender_bean_t *sender_bean = NULL;
 
         sender_bean = 
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env, 
                 internal_sequence_id, msg_id);
         if(!sender_bean)
         {
-            /* Application message is acknowledged. No need to resend. */
+            /* There is no pending message to send. */
             status = AXIS2_SUCCESS;
         }
-        else /* sender_bean is not NULL */
+        else
         {
+            sandesha2_sender_bean_free(sender_bean, env);
+            transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, 
env);
+            if(transport_out)
+            {
+                transport_sender = 
axis2_transport_out_desc_get_sender(transport_out, env);
+            }
+            if(transport_sender)
+            {
+                AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                        "[sandesha2] Transport sender could not be retrieved 
from transport_out");
+                status = AXIS2_FAILURE;
+            }
+
             while(AXIS2_TRUE)
             {
-                AXIS2_SLEEP(4);
-                status = sandesha2_app_msg_processor_resend(env, conf_ctx, 
msg_id, AXIS2_FALSE,
-                    internal_sequence_id, storage_mgr, seq_prop_mgr, 
create_seq_mgr, 
-                    sender_mgr);
+                continue_sending = 
sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_bean, 
+                        conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, 
sender_mgr);
+                if(!continue_sending)
+                {
+                    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                            "[sandesha2] Do not continue sending the 
application message");
+                    break;
+                }
+
+                AXIS2_SLEEP(sleep_time);
+                if(transport_sender)
+                {
+                    /* This is neccessary to avoid a double free */
+                    axis2_msg_ctx_set_property(app_msg_ctx, env, 
AXIS2_TRANSPORT_IN, NULL);
+                    if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, 
app_msg_ctx))
+                    {
+                        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                                "[sandesha2] Transport sender invoke failed in 
sending application message");
+                    }
+                }
 
-                if(AXIS2_SUCCESS != status)
+                if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
                 {
-                    AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
-                        "[sandesha2] Resend failed for  message id %s in 
sequence %s", msg_id, 
-                        internal_sequence_id);
+                    status = 
sandesha2_app_msg_processor_process_app_msg_response(env, app_msg_ctx);
+            
+                    if(AXIS2_SUCCESS != status)
+                    {
+                        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                            "[sandesha2] Resend failed for  message id %s in 
sequence %s", msg_id, 
+                            internal_sequence_id);
 
+                        break;
+                    }
+                }
+                
+                sender_bean = 
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env, 
+                    internal_sequence_id, msg_id);
+                if(!sender_bean)
+                {
+                    /* There is no pending message to send. So exit from the 
loop. */
                     break;
                 }
-
+                else
+                {
+                    sandesha2_sender_bean_free(sender_bean, env);
+                }
             }
-
-            sandesha2_sender_bean_free(sender_bean, env);
         }
 
         if(rm_version)
@@ -2618,9 +2652,13 @@
             AXIS2_FREE(env->allocator, rm_version);
         }
 
-        if(acks_to_addr)
+        if(reply_to_addr)
         {
-            AXIS2_FREE(env->allocator, acks_to_addr);
+            AXIS2_FREE(env->allocator, reply_to_addr);
+        }
+        if(from_acks_to_addr)
+        {
+            AXIS2_FREE(env->allocator, from_acks_to_addr);
         }
 
         return status;
@@ -2631,57 +2669,22 @@
         AXIS2_FREE(env->allocator, rm_version);
     }
 
-    if(acks_to_addr)
+    if(reply_to_addr)
     {
-        AXIS2_FREE(env->allocator, acks_to_addr);
+        AXIS2_FREE(env->allocator, reply_to_addr);
     }
 
-    /* If not (single channel) resend in a thread */
-    while(AXIS2_TRUE)
+    if(from_acks_to_addr)
     {
-        sandesha2_sender_bean_t *sender_bean = NULL;
-        axis2_char_t *temp_msg_id = NULL;
-
-        sender_bean = 
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env, 
-                internal_sequence_id, NULL);
-        if(!sender_bean)
-        {
-            /* There is no pending message to send. So exit from the loop. */
-            break;
-        }
-        else
-        {
-            temp_msg_id = sandesha2_sender_bean_get_msg_id(sender_bean, env);
-            if(!axutil_strcmp(temp_msg_id, msg_id))
-            {
-                if(sender_bean)
-                {
-                    sandesha2_sender_bean_free(sender_bean, env); 
-                }
-
-                continue;
-            }
-        }
-
-        status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, 
is_svr_side,
-                internal_sequence_id, storage_mgr, seq_prop_mgr, 
create_seq_mgr, 
-                sender_mgr);
-
-        if(AXIS2_SUCCESS != status)
-        {
-            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
-                "[sandesha2] Resend failed for  message id %s in sequence %s", 
msg_id, 
-                internal_sequence_id);
-
-            break;
-        }
-
-        if(sender_bean)
-        {
-            sandesha2_sender_bean_free(sender_bean, env); 
-        }
+        AXIS2_FREE(env->allocator, from_acks_to_addr);
     }
 
+    /* If not (single channel) spawn a thread and see whether acknowledgment 
has arrived through the 
+     * sandesha2_sender_mgr_get_next_application_msg_to_send() function. If it 
has arrived exit from
+     * the thread.*/
+    sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx, 
internal_sequence_id, 
+            msg_id, is_svr_side, sleep_time);
+
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[Sandesha2] 
Exit:sandesha2_app_msg_processor_send_app_msg");
 
     return status;
@@ -2812,11 +2815,11 @@
 
         if(!axis2_msg_ctx_get_server_side(msg_ctx, env))
         {
-            status = sandesha2_app_msg_processor_process_app_msg_response(env, 
msg_ctx, storage_mgr);
+            /*status = 
sandesha2_app_msg_processor_process_app_msg_response(env, msg_ctx, storage_mgr);
             if(AXIS2_SUCCESS != status)
             {
                 return status;
-            }
+            }*/
         }
     }
 
@@ -2838,8 +2841,7 @@
 static axis2_status_t AXIS2_CALL
 sandesha2_app_msg_processor_process_app_msg_response(
     const axutil_env_t *env, 
-    axis2_msg_ctx_t *msg_ctx,
-    sandesha2_storage_mgr_t *storage_mgr)
+    axis2_msg_ctx_t *msg_ctx)
 {
     axutil_property_t *property = NULL;
     axis2_msg_ctx_t *res_msg_ctx = NULL;
@@ -3102,3 +3104,122 @@
     return AXIS2_SUCCESS;
 }
 
+static axis2_status_t
+sandesha2_app_msg_processor_start_application_msg_resender(
+    const axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *internal_sequence_id,
+    axis2_char_t *msg_id,
+    const axis2_bool_t is_server_side,
+    int sleep_time)
+{
+    axutil_thread_t *worker_thread = NULL;
+    sandesha2_app_msg_processor_args_t *args = NULL;
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] 
Entry:sandesha2_app_msg_processor_start_application_msg_resender");
+    
+    axutil_allocator_switch_to_global_pool(env->allocator);
+    args = AXIS2_MALLOC(env->allocator, 
sizeof(sandesha2_app_msg_processor_args_t));
+    args->env = axutil_init_thread_env(env);
+    axutil_allocator_switch_to_local_pool(env->allocator);
+    args->conf_ctx = conf_ctx;
+    args->internal_sequence_id = internal_sequence_id;
+    args->msg_id = msg_id;
+    args->sleep_time = sleep_time;
+    args->is_server_side = is_server_side;
+    ++(env->allocator->ref_pool_allocator);
+
+    worker_thread = axutil_thread_pool_get_thread(env->thread_pool, 
+            sandesha2_app_msg_processor_application_msg_worker_function, 
(void*)args);
+    if(!worker_thread)
+    {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Thread creation failed for 
sandesha2_app_msg_processor_start_application_msg_resender");
+        return AXIS2_FAILURE;
+    }
+
+    axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
+            "[sandesha2] 
Exit:sandesha2_app_msg_processor_start_application_msg_resender");
+    return AXIS2_SUCCESS;
+}
+
+static void * AXIS2_THREAD_FUNC
+sandesha2_app_msg_processor_application_msg_worker_function(
+    axutil_thread_t *thd, 
+    void *data)
+{
+    sandesha2_app_msg_processor_args_t *args;
+    axutil_env_t *env = NULL;
+    sandesha2_storage_mgr_t *storage_mgr = NULL;
+    sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
+    sandesha2_sender_mgr_t *sender_mgr = NULL;
+    int sleep_time = 0;
+    axis2_char_t *dbname = NULL;
+    axis2_conf_ctx_t *conf_ctx = NULL;
+    axis2_char_t *internal_sequence_id = NULL;
+    axis2_bool_t is_server_side = AXIS2_FALSE;
+    sandesha2_sender_bean_t *sender_bean = NULL;
+    axis2_char_t *msg_id = NULL;
+    axis2_status_t status = AXIS2_FAILURE;
+
+    args = (sandesha2_app_msg_processor_args_t*) data;
+    env = args->env;
+    axutil_allocator_switch_to_global_pool(env->allocator);
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2] 
Entry:sandesha2_app_msg_processor_application_msg_worker_function");
+    conf_ctx = args->conf_ctx;
+    msg_id = args->msg_id;
+    internal_sequence_id = axutil_strdup(env, args->internal_sequence_id);
+    is_server_side = args->is_server_side;
+    sleep_time = args->sleep_time;
+    dbname = sandesha2_util_get_dbname(env, conf_ctx);
+    storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
+    seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
+    sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
+
+
+    while(AXIS2_TRUE)
+    {
+        AXIS2_SLEEP(sleep_time);
+        sender_bean = 
sandesha2_sender_mgr_get_next_application_msg_to_send(sender_mgr, env, 
+                internal_sequence_id, msg_id);
+        if(!sender_bean)
+        {
+            /* There is no pending message to send. So exit from the thread. */
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                    "[sandesha2] There is no pending message to send. So exit 
from the thread");
+            break;
+        }
+
+        status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, 
is_server_side,
+                internal_sequence_id, storage_mgr, seq_prop_mgr, NULL, 
+                sender_mgr);
+
+        if(AXIS2_SUCCESS != status)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                "[sandesha2] Resend failed for  message id %s in sequence %s", 
msg_id, 
+                internal_sequence_id);
+
+            if(sender_bean)
+            {
+                sandesha2_sender_bean_free(sender_bean, env); 
+            }
+            break;
+        }
+
+        if(sender_bean)
+        {
+            sandesha2_sender_bean_free(sender_bean, env); 
+        }
+    }
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2] 
Entry:sandesha2_app_msg_processor_application_msg_worker_function");
+    
+    return NULL;
+}
+

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/sandesha2_utils.c
 Sat Jun 14 06:44:21 2008
@@ -432,25 +432,25 @@
     return storage_mgr;
 }
 
-/*AXIS2_EXTERN axis2_char_t* AXIS2_CALL                       
-sandesha2_utils_get_svr_side_incoming_seq_id(
+AXIS2_EXTERN axis2_char_t* AXIS2_CALL                       
+sandesha2_utils_get_rmd_seq_id(
         const axutil_env_t *env,
-        axis2_char_t *incoming_seq_id)
+        axis2_char_t *internal_sequence_id)
 {
     axis2_char_t *start_str = NULL;
     axis2_char_t *ret = NULL;
     int start_len = 0;
     
-    AXIS2_PARAM_CHECK(env->error, incoming_seq_id, NULL);
+    AXIS2_PARAM_CHECK(env->error, internal_sequence_id, NULL);
     
     start_str = axutil_strcat(env, SANDESHA2_INTERNAL_SEQ_PREFIX, ":", NULL);
     start_len = axutil_strlen(start_str);
-    if(0 != axutil_strncmp(incoming_seq_id, start_str, start_len))
+    if(0 != axutil_strncmp(internal_sequence_id, start_str, start_len))
         return NULL;
-    ret = axutil_strdup(env, (incoming_seq_id + start_len * 
sizeof(axis2_char_t)));
+    ret = axutil_strdup(env, (internal_sequence_id + start_len * 
sizeof(axis2_char_t)));
     
     return ret;    
-}*/
+}
 
 
 AXIS2_EXTERN sandesha2_property_bean_t* AXIS2_CALL

Modified: 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c?rev=667798&r1=667797&r2=667798&view=diff
==============================================================================
--- 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
 (original)
+++ 
webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/util/seq_mgr.c
 Sat Jun 14 06:44:21 2008
@@ -423,6 +423,7 @@
         }
 
         reply_to_epr = axis2_msg_ctx_get_to(req_msg_ctx, env);
+        acks_to_str = (axis2_char_t *) 
axis2_endpoint_ref_get_address(reply_to_epr, env);
 
         if(reply_to_epr)
         {
@@ -438,7 +439,7 @@
             return AXIS2_FAILURE;
         }        
     }
-    else
+    else /* Not server side */
     {
         reply_to_epr = axis2_msg_ctx_get_reply_to(first_app_msg, env);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to