Fixed small leak in PSA UDP_MC topic_subscription

Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/e0d33e5a
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/e0d33e5a
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/e0d33e5a

Branch: refs/heads/develop
Commit: e0d33e5ae70b9383cdd5b9c6381ffc93e89bb8df
Parents: fc720cf
Author: gricciardi <[email protected]>
Authored: Thu Sep 21 10:42:00 2017 +0200
Committer: gricciardi <[email protected]>
Committed: Thu Sep 21 10:42:00 2017 +0200

----------------------------------------------------------------------
 .../private/src/topic_subscription.c            | 135 +++++++++----------
 1 file changed, 66 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/e0d33e5a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 5896264..9bf0f80 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -243,85 +243,82 @@ celix_status_t 
pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
        celix_status_t status = CELIX_SUCCESS;
        celixThreadMutex_lock(&ts->ts_lock);
 
-       if(hashMap_containsKey(ts->socketMap, pubURL)){
-               printf("PSA_UDM_MC_TS: PubURL %s already existing!\n",pubURL);
-               celixThreadMutex_unlock(&ts->ts_lock);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       int *recvSocket = calloc(sizeof(int), 1);
-       *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
-       if (*recvSocket < 0) {
-               perror("pubsub_topicSubscriptionCreate:socket");
-               status = CELIX_SERVICE_EXCEPTION;
-       }
-
-       if (status == CELIX_SUCCESS){
-               int reuse = 1;
-               if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) 
&reuse, sizeof(reuse)) != 0) {
-                       perror("setsockopt() SO_REUSEADDR");
-                       status = CELIX_SERVICE_EXCEPTION;
-               }
-       }
+       if(!hashMap_containsKey(ts->socketMap, pubURL)){
 
-       if(status == CELIX_SUCCESS){
-               // TODO Check if there is a better way to parse the URL to 
IP/Portnr
-               //replace ':' by spaces
-               char *url = strdup(pubURL);
-               char *pt = url;
-               while((pt=strchr(pt, ':')) != NULL) {
-                       *pt = ' ';
-               }
-               char mcIp[100];
-               unsigned short mcPort;
-               sscanf(url, "udp //%s %hu", mcIp, &mcPort);
-               free (url);
-
-               printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, 
Port = %hu\n", mcIp, mcPort);
-
-               struct ip_mreq mc_addr;
-               mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
-               mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
-               printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
-               if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, 
(char*) &mc_addr, sizeof(mc_addr)) != 0) {
-                       perror("setsockopt() IP_ADD_MEMBERSHIP");
+               int *recvSocket = calloc(sizeof(int), 1);
+               *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+               if (*recvSocket < 0) {
+                       perror("pubsub_topicSubscriptionCreate:socket");
                        status = CELIX_SERVICE_EXCEPTION;
                }
 
                if (status == CELIX_SUCCESS){
-                       struct sockaddr_in mcListenAddr;
-                       mcListenAddr.sin_family = AF_INET;
-                       mcListenAddr.sin_addr.s_addr = INADDR_ANY;
-                       mcListenAddr.sin_port = htons(mcPort);
-                       if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, 
sizeof(mcListenAddr)) != 0) {
-                               perror("bind()");
+                       int reuse = 1;
+                       if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, 
(char*) &reuse, sizeof(reuse)) != 0) {
+                               perror("setsockopt() SO_REUSEADDR");
                                status = CELIX_SERVICE_EXCEPTION;
                        }
                }
 
-               if (status == CELIX_SUCCESS){
-#if defined(__APPLE__) && defined(__MACH__)
-                       //TODO: Use kqueue for OSX
-#else
-                       struct epoll_event ev;
-                       memset(&ev, 0, sizeof(ev));
-                       ev.events = EPOLLIN;
-                       ev.data.fd = *recvSocket;
-                       if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, 
*recvSocket, &ev) == -1) {
-                               perror("epoll_ctl() EPOLL_CTL_ADD");
+               if(status == CELIX_SUCCESS){
+                       // TODO Check if there is a better way to parse the URL 
to IP/Portnr
+                       //replace ':' by spaces
+                       char *url = strdup(pubURL);
+                       char *pt = url;
+                       while((pt=strchr(pt, ':')) != NULL) {
+                               *pt = ' ';
+                       }
+                       char mcIp[100];
+                       unsigned short mcPort;
+                       sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+                       free(url);
+
+                       printf("pubsub_topicSubscriptionConnectPublisher : IP = 
%s, Port = %hu\n", mcIp, mcPort);
+
+                       struct ip_mreq mc_addr;
+                       mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+                       mc_addr.imr_interface.s_addr = 
inet_addr(ts->ifIpAddress);
+                       printf("Adding MC %s at interface %s\n", mcIp, 
ts->ifIpAddress);
+                       if (setsockopt(*recvSocket, IPPROTO_IP, 
IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+                               perror("setsockopt() IP_ADD_MEMBERSHIP");
                                status = CELIX_SERVICE_EXCEPTION;
                        }
+
+                       if (status == CELIX_SUCCESS){
+                               struct sockaddr_in mcListenAddr;
+                               mcListenAddr.sin_family = AF_INET;
+                               mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+                               mcListenAddr.sin_port = htons(mcPort);
+                               if(bind(*recvSocket, (struct 
sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+                                       perror("bind()");
+                                       status = CELIX_SERVICE_EXCEPTION;
+                               }
+                       }
+
+                       if (status == CELIX_SUCCESS){
+#if defined(__APPLE__) && defined(__MACH__)
+                               //TODO: Use kqueue for OSX
+#else
+                               struct epoll_event ev;
+                               memset(&ev, 0, sizeof(ev));
+                               ev.events = EPOLLIN;
+                               ev.data.fd = *recvSocket;
+                               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, 
*recvSocket, &ev) == -1) {
+                                       perror("epoll_ctl() EPOLL_CTL_ADD");
+                                       status = CELIX_SERVICE_EXCEPTION;
+                               }
 #endif
-               }
+                       }
 
-       }
+               }
 
-       if (status == CELIX_SUCCESS){
-               hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket);
-       }else{
-               free(recvSocket);
+               if (status == CELIX_SUCCESS){
+                       hashMap_put(ts->socketMap, strdup(pubURL), 
(void*)recvSocket);
+               }
+               else{
+                       free(recvSocket);
+               }
        }
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -533,15 +530,15 @@ static void* udp_recv_thread_func(void * arg) {
        topic_subscription_pt sub = (topic_subscription_pt) arg;
 
 #if defined(__APPLE__) && defined(__MACH__)
-    //TODO: use kqueue for OSX
-    //struct kevent events[MAX_EPOLL_EVENTS];
-    while (sub->running) {
-       int nfds = 0;
+       //TODO: use kqueue for OSX
+       //struct kevent events[MAX_EPOLL_EVENTS];
+       while (sub->running) {
+               int nfds = 0;
                if(nfds > 0) {
                        pubsub_udp_msg_t* udpMsg = NULL;
                        process_msg(sub, udpMsg);
                }
-    }
+       }
 #else
        struct epoll_event events[MAX_EPOLL_EVENTS];
 

Reply via email to