Repository: celix
Updated Branches:
  refs/heads/develop ea11a7851 -> e17228813


Fixed potential deadlocks Coverity issues


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/e1722881
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/e1722881
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/e1722881

Branch: refs/heads/develop
Commit: e1722881361fcb62b92e1f5ba104ed29f88efbb1
Parents: ea11a78
Author: gricciardi <[email protected]>
Authored: Thu Jun 22 11:51:29 2017 +0200
Committer: gricciardi <[email protected]>
Committed: Thu Jun 22 11:51:29 2017 +0200

----------------------------------------------------------------------
 .../private/include/pubsub_admin_impl.h         |  5 ++++
 .../private/src/pubsub_admin_impl.c             |  4 +--
 .../private/src/topic_publication.c             | 30 +++++++++++++++-----
 .../private/src/topic_subscription.c            | 22 ++++++++------
 4 files changed, 43 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h 
b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 3c36986..ccfa9e6 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -74,6 +74,11 @@ struct pubsub_admin {
     unsigned int maxPort;
 };
 
+/* Note: correct locking order is
+ * 1. subscriptionsLock
+ * 2. publications locks
+ */
+
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin);
 celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 5ccee2c..953ce16 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -323,6 +323,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                return pubsubAdmin_addAnySubscription(admin,subEP);
        }
 
+       celixThreadMutex_lock(&admin->subscriptionsLock);
        /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
        celixThreadMutex_lock(&admin->localPublicationsLock);
        celixThreadMutex_lock(&admin->externalPublicationsLock);
@@ -378,9 +379,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                        }
 
                        if(status==CELIX_SUCCESS){
-                               
celixThreadMutex_lock(&admin->subscriptionsLock);
                                
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
-                               
celixThreadMutex_unlock(&admin->subscriptionsLock);
                        }
                }
 
@@ -392,6 +391,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
     free(scope_topic);
        celixThreadMutex_unlock(&admin->externalPublicationsLock);
        celixThreadMutex_unlock(&admin->localPublicationsLock);
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
 
        return status;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 28bf56e..2eaad97 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -63,12 +63,13 @@
 
 struct topic_publication {
        zsock_t* zmq_socket;
+       celix_thread_mutex_t socket_lock; //Protects zmq_socket access
        zcert_t * zmq_cert;
        char* endpoint;
        service_registration_pt svcFactoryReg;
        array_list_pt pub_ep_list; //List<pubsub_endpoint>
        hash_map_pt boundServices; //<bundle_pt,bound_service>
-       celix_thread_mutex_t tp_lock;
+       celix_thread_mutex_t tp_lock; // Protects topic_publication data 
structure
        pubsub_serializer_service_t* serializerSvc;
 };
 
@@ -79,11 +80,19 @@ typedef struct publish_bundle_bound_service {
        char *topic;
        pubsub_msg_serializer_map_t* map;
        unsigned short getCount;
-       celix_thread_mutex_t mp_lock;
+       celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service 
data structure
        bool mp_send_in_progress;
        array_list_pt mp_parts;
 } publish_bundle_bound_service_t;
 
+/* Note: correct locking order is
+ * 1. tp_lock
+ * 2. mp_lock
+ * 3. socket_lock
+ *
+ * tp_lock and socket_lock are independent.
+ */
+
 typedef struct pubsub_msg {
        pubsub_msg_header_pt header;
        char* payload;
@@ -211,6 +220,8 @@ celix_status_t 
pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
        pub->zmq_socket = socket;
        pub->serializerSvc = NULL;
 
+       celixThreadMutex_create(&(pub->socket_lock),NULL);
+
 #ifdef BUILD_WITH_ZMQ_SECURITY
        if (pubEP->is_secure){
                pub->zmq_cert = pub_cert;
@@ -241,7 +252,6 @@ celix_status_t 
pubsub_topicPublicationDestroy(topic_publication_pt pub){
        hashMap_destroy(pub->boundServices,false,false);
 
        pub->svcFactoryReg = NULL;
-       zsock_destroy(&(pub->zmq_socket));
 #ifdef BUILD_WITH_ZMQ_SECURITY
        zcert_destroy(&(pub->zmq_cert));
 #endif
@@ -250,6 +260,12 @@ celix_status_t 
pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
        celixThreadMutex_destroy(&(pub->tp_lock));
 
+       celixThreadMutex_lock(&(pub->socket_lock));
+       zsock_destroy(&(pub->zmq_socket));
+       celixThreadMutex_unlock(&(pub->socket_lock));
+
+       celixThreadMutex_destroy(&(pub->socket_lock));
+
        free(pub);
 
        return status;
@@ -570,16 +586,16 @@ static int pubsub_topicPublicationSendMultipart(void 
*handle, unsigned int msgTy
                        }
                        else{
                                arrayList_add(bound->mp_parts,msg);
-                               
celixThreadMutex_lock(&(bound->parent->tp_lock));
+                               
celixThreadMutex_lock(&(bound->parent->socket_lock));
                                snd = 
send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
                                bound->mp_send_in_progress = false;
-                               
celixThreadMutex_unlock(&(bound->parent->tp_lock));
+                               
celixThreadMutex_unlock(&(bound->parent->socket_lock));
                        }
                        break;
                case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:    
//Normal send case
-                       celixThreadMutex_lock(&(bound->parent->tp_lock));
+                       celixThreadMutex_lock(&(bound->parent->socket_lock));
                        snd = 
send_pubsub_msg(bound->parent->zmq_socket,msg,true);
-                       celixThreadMutex_unlock(&(bound->parent->tp_lock));
+                       celixThreadMutex_unlock(&(bound->parent->socket_lock));
                        break;
                default:
                        printf("TP: ERROR: Invalid MP flags combination\n");

http://git-wip-us.apache.org/repos/asf/celix/blob/e1722881/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 1f42fa3..409c7a5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -61,12 +61,12 @@ struct topic_subscription {
        zsock_t* zmq_socket;
        zcert_t * zmq_cert;
        zcert_t * zmq_pub_cert;
-       pthread_mutex_t socket_lock;
+       pthread_mutex_t socket_lock; //Protects zmq_socket access
        service_tracker_pt tracker;
        array_list_pt sub_ep_list;
        celix_thread_t recv_thread;
        bool running;
-       celix_thread_mutex_t ts_lock;
+       celix_thread_mutex_t ts_lock; //Protects topic_subscription data 
structure access
        bundle_context_pt context;
 
        hash_map_pt msgSerializerMapMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
@@ -80,6 +80,11 @@ struct topic_subscription {
        pubsub_serializer_service_t* serializerSvc;
 };
 
+/* Note: correct locking order is
+ * 1. socket_lock
+ * 2. ts_lock
+ */
+
 typedef struct complete_zmq_msg {
        zframe_t* header;
        zframe_t* payload;
@@ -284,17 +289,18 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
        celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
 
-       celixThreadMutex_lock(&ts->socket_lock);
-       zsock_destroy(&(ts->zmq_socket));
        #ifdef BUILD_WITH_ZMQ_SECURITY
        zcert_destroy(&(ts->zmq_cert));
        zcert_destroy(&(ts->zmq_pub_cert));
        #endif
-       celixThreadMutex_unlock(&ts->socket_lock);
-       celixThreadMutex_destroy(&ts->socket_lock);
 
        celixThreadMutex_unlock(&ts->ts_lock);
 
+       celixThreadMutex_lock(&ts->socket_lock);
+       zsock_destroy(&(ts->zmq_socket));
+       celixThreadMutex_unlock(&ts->socket_lock);
+       celixThreadMutex_destroy(&ts->socket_lock);
+
 
        free(ts);
 
@@ -623,8 +629,6 @@ static void* zmq_recv_thread_func(void * arg) {
                                        zframe_destroy(&headerMsg);
                                } else {
 
-                                       celixThreadMutex_lock(&sub->ts_lock);
-
                                        //Let's fetch all the messages from the 
socket
                                        array_list_pt msg_list = NULL;
                                        arrayList_create(&msg_list);
@@ -669,8 +673,8 @@ static void* zmq_recv_thread_func(void * arg) {
                                                }
                                        }
 
+                                       celixThreadMutex_lock(&sub->ts_lock);
                                        process_msg(sub, msg_list);
-
                                        celixThreadMutex_unlock(&sub->ts_lock);
 
                                }

Reply via email to