Refactored serializers management

Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/3b99cc34
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/3b99cc34
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/3b99cc34

Branch: refs/heads/develop
Commit: 3b99cc34cd3712b39a34e5789363ab497e85a75e
Parents: 623d47e
Author: gricciardi <[email protected]>
Authored: Wed Sep 20 15:38:21 2017 +0200
Committer: gricciardi <[email protected]>
Committed: Wed Sep 20 15:38:21 2017 +0200

----------------------------------------------------------------------
 pubsub/api/pubsub/publisher.h                   |   2 +-
 pubsub/api/pubsub/subscriber.h                  |   2 +-
 .../examples/mp_pubsub/publisher/CMakeLists.txt |   2 +-
 .../mp_pubsub/subscriber/CMakeLists.txt         |   8 +-
 pubsub/examples/pubsub/publisher/CMakeLists.txt |   2 +-
 .../publisher/private/src/pubsub_publisher.c    |  10 +-
 .../examples/pubsub/publisher2/CMakeLists.txt   |   2 +-
 .../examples/pubsub/subscriber/CMakeLists.txt   |   2 +-
 pubsub/pubsub_admin_udp_mc/CMakeLists.txt       |   3 +-
 .../private/include/pubsub_admin_impl.h         |  36 +-
 .../include/pubsub_publish_service_private.h    |  59 --
 .../private/include/topic_publication.h         |  57 ++
 .../private/include/topic_subscription.h        |   9 +-
 .../private/src/psa_activator.c                 |  38 +-
 .../private/src/pubsub_admin_impl.c             | 741 ++++++++++++-------
 .../private/src/topic_publication.c             | 299 +++-----
 .../private/src/topic_subscription.c            | 546 +++++++-------
 pubsub/pubsub_admin_zmq/CMakeLists.txt          |   3 +-
 .../private/include/pubsub_admin_impl.h         |  38 +-
 .../include/pubsub_publish_service_private.h    |  51 --
 .../private/include/topic_publication.h         |  49 ++
 .../private/include/topic_subscription.h        |   7 +-
 .../private/src/psa_activator.c                 |  36 +-
 .../private/src/pubsub_admin_impl.c             | 708 ++++++++++++------
 .../private/src/topic_publication.c             | 269 +++----
 .../private/src/topic_subscription.c            | 502 ++++++-------
 .../pubsub_common/public/include/pubsub_admin.h |  22 +-
 .../public/include/pubsub_admin_match.h         |  27 +
 .../public/include/pubsub_common.h              |   8 +-
 .../public/include/pubsub_endpoint.h            |  12 +-
 .../public/include/pubsub_serializer.h          |  27 +-
 pubsub/pubsub_common/public/src/log_helper.c    |  48 +-
 .../public/src/pubsub_admin_match.c             | 303 ++++++++
 .../pubsub_common/public/src/pubsub_endpoint.c  | 184 +++--
 .../private/include/pubsub_discovery_impl.h     |   1 -
 .../pubsub_discovery/private/src/etcd_common.c  |   1 +
 .../pubsub_discovery/private/src/etcd_watcher.c | 216 +++---
 .../pubsub_discovery/private/src/etcd_writer.c  | 160 ++--
 .../private/src/psd_activator.c                 |   2 +-
 .../private/src/pubsub_discovery_impl.c         |  13 +-
 .../private/include/pubsub_serializer_impl.h    |  23 +-
 .../private/src/ps_activator.c                  |   6 +-
 .../private/src/pubsub_serializer_impl.c        | 339 +++++----
 .../private/include/pubsub_topology_manager.h   |  11 -
 .../private/src/pstm_activator.c                |  64 +-
 .../private/src/pubsub_topology_manager.c       | 526 ++++---------
 46 files changed, 2940 insertions(+), 2534 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/api/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/pubsub/api/pubsub/publisher.h b/pubsub/api/pubsub/publisher.h
index 4bc6c8c..3eec149 100644
--- a/pubsub/api/pubsub/publisher.h
+++ b/pubsub/api/pubsub/publisher.h
@@ -30,7 +30,7 @@
 #include <stdlib.h>
 
 #define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
-#define PUBSUB_PUBLISHER_SERVICE_VERSION        "1.0.0"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION             "2.0.0"
  
 //properties
 #define PUBSUB_PUBLISHER_TOPIC                  "pubsub.topic"

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/api/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/pubsub/api/pubsub/subscriber.h b/pubsub/api/pubsub/subscriber.h
index cbbe96c..5d87b8a 100644
--- a/pubsub/api/pubsub/subscriber.h
+++ b/pubsub/api/pubsub/subscriber.h
@@ -30,7 +30,7 @@
 #include <stdbool.h>
 
 #define PUBSUB_SUBSCRIBER_SERVICE_NAME          "pubsub.subscriber"
-#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "1.0.0"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "2.0.0"
  
 //properties
 #define PUBSUB_SUBSCRIBER_TOPIC                "pubsub.topic"

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt 
b/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
index de77156..76a01f1 100644
--- a/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
+++ b/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
@@ -34,7 +34,7 @@ bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
        
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
        
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
        
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
-       DESTINATION "META-INF/descriptors/messages"
+       DESTINATION "META-INF/descriptors"
 )
 
 bundle_files(org.apache.celix.pubsub_publisher.MpPublisher

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt 
b/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
index 75ec635..a480a73 100644
--- a/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
+++ b/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
@@ -31,10 +31,10 @@ add_bundle( org.apache.celix.pubsub_subscriber.MpSubscriber
 )
 
 bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
-           
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
-           
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
-           
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
-       DESTINATION "META-INF/descriptors/messages"
+    
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
+    
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
+    
${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
+       DESTINATION "META-INF/descriptors"
 )
 
 bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/pubsub/publisher/CMakeLists.txt 
b/pubsub/examples/pubsub/publisher/CMakeLists.txt
index d932611..e35c137 100644
--- a/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@ -32,7 +32,7 @@ add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher
 bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher
                
${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
                
${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
-    DESTINATION "META-INF/descriptors/messages"
+    DESTINATION "META-INF/descriptors"
 )
 
 bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
----------------------------------------------------------------------
diff --git a/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c 
b/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 66454a0..b798ea1 100644
--- a/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -77,18 +77,20 @@ static void* send_thread(void* arg){
                while(stop==false){
                        place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
                        place->position.lon = randCoordinate(MIN_LON,MAX_LON);
-                       int nr_char = (int)randCoordinate(5,100000);
-                       //int nr_char = 25;
+                       //int nr_char = (int)randCoordinate(5,100000);
+                       int nr_char = 32;
                        place->data = calloc(nr_char, 1);
                        for(int i = 0; i < (nr_char-1); i++) {
                                place->data[i] = i%10 + '0';
                        }
                        if(publish_svc->send) {
-                               
publish_svc->send(publish_svc->handle,msgId,place);
+                               
if(publish_svc->send(publish_svc->handle,msgId,place)==0){
+                                       printf("Sent %s [%f, %f] (%s, %s) data 
len = %d\n",st_struct->topic, place->position.lat, 
place->position.lon,place->name,place->description, nr_char);
+                               }
                        } else {
                                printf("No send for %s\n", st_struct->topic);
                        }
-                       printf("Sent %s [%f, %f] (%s, %s) data len = 
%d\n",st_struct->topic, place->position.lat, 
place->position.lon,place->name,place->description, nr_char);
+
                        free(place->data);
                        sleep(2);
                }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/publisher2/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/pubsub/publisher2/CMakeLists.txt 
b/pubsub/examples/pubsub/publisher2/CMakeLists.txt
index c44a760..b83f7dd 100644
--- a/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/pubsub/examples/pubsub/publisher2/CMakeLists.txt
@@ -32,7 +32,7 @@ add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher2
 bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
        
${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
        
${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
-    DESTINATION "META-INF/descriptors/messages"
+    DESTINATION "META-INF/descriptors"
 )
 
 bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/pubsub/subscriber/CMakeLists.txt 
b/pubsub/examples/pubsub/subscriber/CMakeLists.txt
index da6a362..7fd9fae 100644
--- a/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -33,7 +33,7 @@ add_bundle(org.apache.celix.pubsub_subscriber.PoiSubscriber
 bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
            
${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
            
${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
-    DESTINATION "META-INF/descriptors/messages"
+    DESTINATION "META-INF/descriptors"
 )
 
 bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt 
b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 1ac0c2d..86f7a47 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -37,10 +37,11 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        private/src/large_udp.c
        ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
        ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
+       
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
 )
 
 set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc 
PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc 
celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES})
+target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc 
celix_framework celix_utils celix_dfi)
 
 install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 89e6547..731b037 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -24,20 +24,23 @@
  *  \copyright Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_ADMIN_IMPL_H_
-#define PUBSUB_ADMIN_IMPL_H_
+#ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_
+#define PUBSUB_ADMIN_UDP_MC_IMPL_H_
 
 #include "pubsub_admin.h"
-#include "pubsub_serializer.h"
 #include "log_helper.h"
 
-struct pubsub_admin {
+#define PUBSUB_ADMIN_TYPE      "udp_mc"
 
-       pubsub_serializer_service_t* serializerSvc;
+struct pubsub_admin {
 
        bundle_context_pt bundle_context;
        log_helper_pt loghelper;
 
+       /* List of the available serializers */
+       celix_thread_mutex_t serializerListLock; // List<serializers>
+       array_list_pt serializerList;
+
        celix_thread_mutex_t localPublicationsLock;
        hash_map_pt localPublications;//<topic(string),service_factory_pt>
 
@@ -50,15 +53,24 @@ struct pubsub_admin {
        celix_thread_mutex_t pendingSubscriptionsLock;
        hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
 
+       /* Those are used to keep track of valid subscriptions/publications 
that still have no valid serializer */
+       celix_thread_mutex_t noSerializerPendingsLock;
+       array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+       array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+       celix_thread_mutex_t usedSerializersLock;
+       hash_map_pt topicSubscriptionsPerSerializer; // 
<serializer,List<topicSubscription>>
+       hash_map_pt topicPublicationsPerSerializer; // 
<serializer,List<topicPublications>>
+
        char* ifIpAddress; // The local interface which is used for multicast 
communication
-    char* mcIpAddress; // The multicast IP address
+       char* mcIpAddress; // The multicast IP address
 
        int sendSocket;
+       void* zmq_context; // to be removed
 
 };
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin);
-celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin);
 celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
 
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
@@ -70,10 +82,10 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* 
scope, char* topic);
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope, char* topic);
 
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score);
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score);
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service);
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
 
-#endif /* PUBSUB_ADMIN_IMPL_H_ */
+#endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
deleted file mode 100644
index b43fb08..0000000
--- 
a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publish_service_private.h
- *
- *  \date       Sep 24, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-
-#include "publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-#include "pubsub_serializer.h"
-
-#define UDP_BASE_PORT  49152
-#define UDP_MAX_PORT   65000
-
-typedef struct pubsub_udp_msg {
-    struct pubsub_msg_header header;
-    unsigned int payloadSize;
-    char payload[];
-} pubsub_udp_msg_t;
-
-typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
-
-celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* serializerSvc);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub);
-
-#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h 
b/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h
new file mode 100644
index 0000000..4363d71
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h
@@ -0,0 +1,57 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+#define UDP_BASE_PORT  49152
+#define UDP_MAX_PORT   65000
+
+typedef struct pubsub_udp_msg {
+    struct pubsub_msg_header header;
+    unsigned int payloadSize;
+    char payload[];
+} pubsub_udp_msg_t;
+
+typedef struct topic_publication *topic_publication_pt;
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h 
b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
index a65cb6b..475416a 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -38,20 +38,21 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt 
bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* 
topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* ifIp,char* scope, char* topic 
,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
 
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL);
+
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
 celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
 
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc);
-
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);
 unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c 
b/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
index cb298fe..cd4ee07 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
@@ -28,6 +28,7 @@
 
 #include "bundle_activator.h"
 #include "service_registration.h"
+#include "service_tracker.h"
 
 #include "pubsub_admin_impl.h"
 
@@ -35,6 +36,7 @@ struct activator {
        pubsub_admin_pt admin;
        pubsub_admin_service_pt adminService;
        service_registration_pt registration;
+       service_tracker_pt serializerTracker;
 };
 
 celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
@@ -47,7 +49,28 @@ celix_status_t bundleActivator_create(bundle_context_pt 
context, void **userData
        }
        else{
                *userData = activator;
+
                status = pubsubAdmin_create(context, &(activator->admin));
+
+               if(status == CELIX_SUCCESS){
+                       service_tracker_customizer_pt customizer = NULL;
+                       status = 
serviceTrackerCustomizer_create(activator->admin,
+                                       NULL,
+                                       pubsubAdmin_serializerAdded,
+                                       NULL,
+                                       pubsubAdmin_serializerRemoved,
+                                       &customizer);
+                       if(status == CELIX_SUCCESS){
+                               status = serviceTracker_create(context, 
PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+                               if(status != CELIX_SUCCESS){
+                                       
serviceTrackerCustomizer_destroy(customizer);
+                                       pubsubAdmin_destroy(activator->admin);
+                               }
+                       }
+                       else{
+                               pubsubAdmin_destroy(activator->admin);
+                       }
+               }
        }
 
        return status;
@@ -73,16 +96,14 @@ celix_status_t bundleActivator_start(void * userData, 
bundle_context_pt context)
                pubsubAdminSvc->closeAllPublications = 
pubsubAdmin_closeAllPublications;
                pubsubAdminSvc->closeAllSubscriptions = 
pubsubAdmin_closeAllSubscriptions;
 
-               pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
-               pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
-
-               pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer;
-               pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer;
+               pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
 
                activator->adminService = pubsubAdminSvc;
 
                status = bundleContext_registerService(context, 
PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
 
+               status += serviceTracker_open(activator->serializerTracker);
+
        }
 
 
@@ -93,10 +114,10 @@ celix_status_t bundleActivator_stop(void * userData, 
bundle_context_pt context)
        celix_status_t status = CELIX_SUCCESS;
        struct activator *activator = userData;
 
-       serviceRegistration_unregister(activator->registration);
-       activator->registration = NULL;
+       status += serviceTracker_close(activator->serializerTracker);
+       status += serviceRegistration_unregister(activator->registration);
 
-       pubsubAdmin_stop(activator->admin);
+       activator->registration = NULL;
 
        free(activator->adminService);
        activator->adminService = NULL;
@@ -108,6 +129,7 @@ celix_status_t bundleActivator_destroy(void * userData, 
bundle_context_pt contex
        celix_status_t status = CELIX_SUCCESS;
        struct activator *activator = userData;
 
+       serviceTracker_destroy(activator->serializerTracker);
        pubsubAdmin_destroy(activator->admin);
        activator->admin = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index bbb452d..6f9427b 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -57,9 +57,10 @@
 
 #include "pubsub_admin_impl.h"
 #include "topic_subscription.h"
-#include "pubsub_publish_service_private.h"
+#include "topic_publication.h"
 #include "pubsub_endpoint.h"
 #include "subscriber.h"
+#include "pubsub_admin_match.h"
 
 static const char *DEFAULT_MC_IP = "224.100.1.1";
 static char *DEFAULT_MC_PREFIX = "224.100";
@@ -67,7 +68,10 @@ static char *DEFAULT_MC_PREFIX = "224.100";
 static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip);
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, 
pubsub_endpoint_pt psEP, double* score);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication);
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin) {
        celix_status_t status = CELIX_SUCCESS;
@@ -86,11 +90,19 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                (*admin)->subscriptions = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
                (*admin)->pendingSubscriptions = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
                (*admin)->externalPublications = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+               (*admin)->topicSubscriptionsPerSerializer = 
hashMap_create(NULL, NULL, NULL, NULL);
+               (*admin)->topicPublicationsPerSerializer  = 
hashMap_create(NULL, NULL, NULL, NULL);
+               arrayList_create(&((*admin)->noSerializerSubscriptions));
+               arrayList_create(&((*admin)->noSerializerPublications));
+               arrayList_create(&((*admin)->serializerList));
 
                celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
                celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
                celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
NULL);
                celixThreadMutex_create(&(*admin)->externalPublicationsLock, 
NULL);
+               celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
NULL);
+               celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+               celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 
                if (logHelper_create(context, &(*admin)->loghelper) == 
CELIX_SUCCESS) {
                        logHelper_start((*admin)->loghelper);
@@ -104,7 +116,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                if (mc_ip == NULL) {
                        const char *mc_prefix = NULL;
                        const char *interface = NULL;
-                       int b0 = 224, b1 = 100, b2 = 1, b3 = 1;
+                       int b0, b1, b2, b3;
                        
bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
                        if(mc_prefix == NULL) {
                                mc_prefix = DEFAULT_MC_PREFIX;
@@ -112,12 +124,12 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
 
                        bundleContext_getProperty(context, PSA_ITF, &interface);
                        if (pubsubAdmin_getIpAddress(interface, &if_ip) != 
CELIX_SUCCESS) {
-                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP address for interface %s", 
interface);
+                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for 
interface %s", interface);
                        }
 
                        printf("IP Detected : %s\n", if_ip);
                        if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, 
&b3) != 4) {
-                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA: Could not parse IP address %s", if_ip);
+                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
                                b2 = 1;
                                b3 = 1;
                        }
@@ -127,57 +139,41 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                        int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
                        if(sendSocket == -1) {
                                perror("pubsubAdmin_create:socket");
-                               status = CELIX_SERVICE_EXCEPTION;
+                               return CELIX_SERVICE_EXCEPTION;
                        }
-                       else{
-                               char loop = 1;
-                               if(setsockopt(sendSocket, IPPROTO_IP, 
IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
-                                       
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
-                                       status = CELIX_SERVICE_EXCEPTION;
-                               }
-
-                               if (status == CELIX_SUCCESS){
-                                       struct in_addr multicast_interface;
-                                       inet_aton(if_ip, &multicast_interface);
-                                       if(setsockopt(sendSocket,  IPPROTO_IP, 
IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
-                                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
-                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                       }
-                                       else{
-                                               (*admin)->sendSocket = 
sendSocket;
-                                       }
-                               }
-
-                               if(status!=CELIX_SUCCESS){
-                                       close(sendSocket);
-                               }
-
+                       char loop = 1;
+                       if(setsockopt(sendSocket, IPPROTO_IP, 
IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
+                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
+                               return CELIX_SERVICE_EXCEPTION;
                        }
 
+                       struct in_addr multicast_interface;
+                       inet_aton(if_ip, &multicast_interface);
+                       if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, 
&multicast_interface, sizeof(multicast_interface)) != 0) {
+                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
+                               return CELIX_SERVICE_EXCEPTION;
+                       }
 
+                       (*admin)->sendSocket = sendSocket;
 
                }
 #endif
                if (if_ip != NULL) {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA: Using %s as interface for multicast communication", 
if_ip);
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast 
communication", if_ip);
                        (*admin)->ifIpAddress = if_ip;
                } else {
                        (*admin)->ifIpAddress = strdup("127.0.0.1");
                }
 
                if (mc_ip != NULL) {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", mc_ip);
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip);
                        (*admin)->mcIpAddress = mc_ip;
                }
                else {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. 
Using %s", DEFAULT_MC_IP);
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation 
set. Using %s", DEFAULT_MC_IP);
                        (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
                }
 
-               if (status != CELIX_SUCCESS){
-                       pubsubAdmin_destroy(*admin);
-               }
-
        }
 
        return status;
@@ -221,6 +217,36 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
        hashMap_destroy(admin->externalPublications,false,false);
        celixThreadMutex_unlock(&admin->externalPublicationsLock);
 
+       celixThreadMutex_lock(&admin->serializerListLock);
+       arrayList_destroy(admin->serializerList);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+       arrayList_destroy(admin->noSerializerSubscriptions);
+       arrayList_destroy(admin->noSerializerPublications);
+       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+       while(hashMapIterator_hasNext(iter)){
+               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+       iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+       while(hashMapIterator_hasNext(iter)){
+               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+       celixThreadMutex_destroy(&admin->usedSerializersLock);
+       celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+       celixThreadMutex_destroy(&admin->serializerListLock);
        celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
        celixThreadMutex_destroy(&admin->subscriptionsLock);
        celixThreadMutex_destroy(&admin->localPublicationsLock);
@@ -235,12 +261,6 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
        return status;
 }
 
-celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       return status;
-}
-
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
@@ -251,52 +271,68 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
        if(any_sub==NULL){
 
                int i;
+               pubsub_serializer_service_t *best_serializer = NULL;
+               if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
+                       status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
+               }
+               else{
+                       printf("PSA_UDP_MC: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                       arrayList_add(admin->noSerializerSubscriptions,subEP);
+                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+               }
 
-               status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, 
admin->bundle_context, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, 
PUBSUB_ANY_SUB_TOPIC,&any_sub);
+               if (status == CELIX_SUCCESS){
 
-               /* Connect all internal publishers */
-               celixThreadMutex_lock(&admin->localPublicationsLock);
-               hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
-               while(hashMapIterator_hasNext(lp_iter)){
-                       service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
-                       topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                       array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                       if(topic_publishers!=NULL){
-                               for(i=0;i<arrayList_size(topic_publishers);i++){
-                                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                       if(pubEP->endpoint !=NULL){
-                                               status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                       /* Connect all internal publishers */
+                       celixThreadMutex_lock(&admin->localPublicationsLock);
+                       hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
+                       while(hashMapIterator_hasNext(lp_iter)){
+                               service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
+                               topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
+                               array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                               if(topic_publishers!=NULL){
+                                       
for(i=0;i<arrayList_size(topic_publishers);i++){
+                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                                               if(pubEP->endpoint !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               }
                                        }
+                                       arrayList_destroy(topic_publishers);
                                }
                        }
-               }
-               hashMapIterator_destroy(lp_iter);
-               celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-               /* Connect also all external publishers */
-               celixThreadMutex_lock(&admin->externalPublicationsLock);
-               hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
-               while(hashMapIterator_hasNext(extp_iter)){
-                       array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
-                       if(ext_pub_list!=NULL){
-                               for(i=0;i<arrayList_size(ext_pub_list);i++){
-                                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                       if(pubEP->endpoint !=NULL){
-                                               status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                       hashMapIterator_destroy(lp_iter);
+                       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+                       /* Connect also all external publishers */
+                       celixThreadMutex_lock(&admin->externalPublicationsLock);
+                       hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
+                       while(hashMapIterator_hasNext(extp_iter)){
+                               array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
+                               if(ext_pub_list!=NULL){
+                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
+                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                                               if(pubEP->endpoint !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               }
                                        }
                                }
                        }
-               }
-               hashMapIterator_destroy(extp_iter);
-               celixThreadMutex_unlock(&admin->externalPublicationsLock);
+                       hashMapIterator_destroy(extp_iter);
+                       
celixThreadMutex_unlock(&admin->externalPublicationsLock);
 
 
-               pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
+                       pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
 
-               status += pubsub_topicSubscriptionStart(any_sub);
+                       status += pubsub_topicSubscriptionStart(any_sub);
+
+               }
 
-               
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+               if (status == CELIX_SUCCESS){
+                       
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+                       connectTopicPubSubToSerializer(admin, best_serializer, 
any_sub, false);
+               }
 
        }
 
@@ -308,16 +344,16 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic);
+       printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic);
 
        if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
                return pubsubAdmin_addAnySubscription(admin,subEP);
        }
 
-       celixThreadMutex_lock(&admin->subscriptionsLock);
        /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
        celixThreadMutex_lock(&admin->localPublicationsLock);
        celixThreadMutex_lock(&admin->externalPublicationsLock);
+
        char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic);
 
        service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -330,54 +366,71 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
        }
        else{
                int i;
-
                topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
 
                if(subscription == NULL) {
+                       pubsub_serializer_service_t *best_serializer = NULL;
+                       if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
+                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, 
subEP->scope, subEP->topic, best_serializer, &subscription);
+                       }
+                       else{
+                               printf("PSA_UDP_MC: Cannot find a serializer 
for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
 
-                       status += 
pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, 
admin->serializerSvc, subEP->scope, subEP->topic,&subscription);
+                       if (status==CELIX_SUCCESS){
 
-                       /* Try to connect internal publishers */
-                       if(factory!=NULL){
-                               topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                               array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+                               /* Try to connect internal publishers */
+                               if(factory!=NULL){
+                                       topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
+                                       array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
 
-                               if(topic_publishers!=NULL){
-                                       
for(i=0;i<arrayList_size(topic_publishers);i++){
-                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                       if(topic_publishers!=NULL){
+                                               
for(i=0;i<arrayList_size(topic_publishers);i++){
+                                                       pubsub_endpoint_pt 
pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                                                       if(pubEP->endpoint 
!=NULL){
+                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                                       }
                                                }
+                                               
arrayList_destroy(topic_publishers);
                                        }
-                               }
 
-                       }
+                               }
 
-                       /* Look also for external publishers */
-                       if(ext_pub_list!=NULL){
-                               for(i=0;i<arrayList_size(ext_pub_list);i++){
-                                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                       if(pubEP->endpoint !=NULL){
-                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                               /* Look also for external publishers */
+                               if(ext_pub_list!=NULL){
+                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
+                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                                               if(pubEP->endpoint !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                               }
                                        }
                                }
-                       }
 
-                       
pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+                               
pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+
+                               status += 
pubsub_topicSubscriptionStart(subscription);
 
-                       status += pubsub_topicSubscriptionStart(subscription);
+                       }
 
                        if(status==CELIX_SUCCESS){
+                               
celixThreadMutex_lock(&admin->subscriptionsLock);
                                
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+                               
celixThreadMutex_unlock(&admin->subscriptionsLock);
+                               connectTopicPubSubToSerializer(admin, 
best_serializer, subscription, false);
                        }
                }
-               pubsub_topicIncreaseNrSubscribers(subscription);
 
+               if (status == CELIX_SUCCESS){
+                       pubsub_topicIncreaseNrSubscribers(subscription);
+               }
        }
+
        free(scope_topic);
        celixThreadMutex_unlock(&admin->externalPublicationsLock);
        celixThreadMutex_unlock(&admin->localPublicationsLock);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
 
        return status;
 
@@ -386,12 +439,13 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
+       printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
 
        celixThreadMutex_lock(&admin->subscriptionsLock);
+
        char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
        topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       free(scope_topic);
+
        if(sub!=NULL){
                pubsub_topicDecreaseNrSubscribers(sub);
                if(pubsub_topicGetNrSubscribers(sub) == 0) {
@@ -399,9 +453,16 @@ celix_status_t 
pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
                }
        }
        else{
-               status = CELIX_ILLEGAL_STATE;
+               /* Maybe the endpoint was pending */
+               celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+               if(!arrayList_removeElement(admin->noSerializerSubscriptions, 
subEP)){
+                       status = CELIX_ILLEGAL_STATE;
+               }
+               celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
        }
 
+       free(scope_topic);
+
        celixThreadMutex_unlock(&admin->subscriptionsLock);
 
        return status;
@@ -411,112 +472,120 @@ celix_status_t 
pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic);
+       printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, 
pubEP->topic);
 
        const char* fwUUID = NULL;
 
        
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
        if(fwUUID==NULL){
-               printf("PSA: Cannot retrieve fwUUID.\n");
+               printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
        char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
 
-       if((strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint==NULL)){
+       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
 
                celixThreadMutex_lock(&admin->localPublicationsLock);
 
-
-               service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+               service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
 
                if (factory == NULL) {
                        topic_publication_pt pub = NULL;
-                       status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, 
admin->mcIpAddress,&pub);
-                       pubsub_topicPublicationSetSerializer(pub, 
admin->serializerSvc); //TODO move back to contructor
-                       //TODO this is certainly needed when admin are created 
per available serializer
-                       if(status == CELIX_SUCCESS){
-                               status = 
pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
-                               if(status==CELIX_SUCCESS && factory !=NULL){
-                                       
hashMap_put(admin->localPublications,strdup(scope_topic),factory);
-                               }
+                       pubsub_serializer_service_t *best_serializer = NULL;
+                       if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer)) == CELIX_SUCCESS){
+                               status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, 
admin->mcIpAddress, &pub);
                        }
                        else{
-                               printf("PSA: Cannot create a topicPublication 
for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
+                               printf("PSA_UDP_MC: Cannot find a serializer 
for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerPublications,pubEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
                        }
-               }
-               else{
+
+                       if (status == CELIX_SUCCESS) {
+                               status = 
pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
+                               if (status == CELIX_SUCCESS && factory != NULL) 
{
+                                       hashMap_put(admin->localPublications, 
strdup(scope_topic), factory);
+                                       connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
+                               }
+                       } else {
+                               printf("PSA_UDP_MC: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
+                       }
+               } else {
                        //just add the new EP to the list
-                       topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-                       pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+                       topic_publication_pt pub = (topic_publication_pt) 
factory->handle;
+                       pubsub_topicPublicationAddPublisherEP(pub, pubEP);
                }
 
-
                celixThreadMutex_unlock(&admin->localPublicationsLock);
        }
        else{
+
                celixThreadMutex_lock(&admin->externalPublicationsLock);
-               array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-               if(ext_pub_list==NULL){
+               array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
+               if (ext_pub_list == NULL) {
                        arrayList_create(&ext_pub_list);
-                       
hashMap_put(admin->externalPublications,strdup(scope_topic),ext_pub_list);
+                       hashMap_put(admin->externalPublications, 
strdup(scope_topic), ext_pub_list);
                }
 
-               arrayList_add(ext_pub_list,pubEP);
+               arrayList_add(ext_pub_list, pubEP);
 
                celixThreadMutex_unlock(&admin->externalPublicationsLock);
        }
 
-       /* Connect the new publisher to the subscription for his topic, if 
there is any */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && pubEP->endpoint!=NULL){
-               pubsub_topicSubscriptionConnectPublisher(sub,pubEP->endpoint);
-       }
-
-       /* And check also for ANY subscription */
-       topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && pubEP->endpoint!=NULL){
-               
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
-       }
-
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
        /* Re-evaluate the pending subscriptions */
        celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 
-       hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions,scope_topic);
-       if(pendingSub!=NULL){ //There were pending subscription for the just 
published topic. Let's connect them.
-               char* key = (char*)hashMapEntry_getKey(pendingSub);
-               array_list_pt pendingSubList = 
(array_list_pt)hashMapEntry_getValue(pendingSub);
+       hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+       if (pendingSub != NULL) { //There were pending subscription for the 
just published topic. Let's connect them.
+               char* topic = (char*) hashMapEntry_getKey(pendingSub);
+               array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
                int i;
-               for(i=0;i<arrayList_size(pendingSubList);i++){
-                       pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(pendingSubList,i);
-                       pubsubAdmin_addSubscription(admin,subEP);
+               for (i = 0; i < arrayList_size(pendingSubList); i++) {
+                       pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
+                       pubsubAdmin_addSubscription(admin, subEP);
                }
-               hashMap_remove(admin->pendingSubscriptions,key);
+               hashMap_remove(admin->pendingSubscriptions, scope_topic);
                arrayList_clear(pendingSubList);
                arrayList_destroy(pendingSubList);
-               free(key);
+               free(topic);
        }
-       free(scope_topic);
 
        celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 
+       /* Connect the new publisher to the subscription for his topic, if 
there is any */
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+
+       topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
+       if (sub != NULL && pubEP->endpoint != NULL) {
+               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
+       }
+
+       /* And check also for ANY subscription */
+       topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+       if (any_sub != NULL && pubEP->endpoint != NULL) {
+               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
+       }
+
+       free(scope_topic);
+
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
        return status;
 
 }
 
 celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
+       int count = 0;
 
-       printf("PSA: Removing publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic);
+       printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, 
pubEP->topic);
 
        const char* fwUUID = NULL;
 
        
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
        if(fwUUID==NULL){
-               printf("PSA: Cannot retrieve fwUUID.\n");
+               printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
        char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
@@ -531,7 +600,12 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                        pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
                }
                else{
-                       status = CELIX_ILLEGAL_STATE;
+                       /* Maybe the endpoint was pending */
+                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                       
if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
+                               status = CELIX_ILLEGAL_STATE;
+                       }
+                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
                }
 
                celixThreadMutex_unlock(&admin->localPublicationsLock);
@@ -546,15 +620,23 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                        for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
                                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
                                found = pubsubEndpoint_equals(pubEP,p);
-                               if(found){
+                               if (found){
                                        arrayList_remove(ext_pub_list,i);
                                }
                        }
+                       // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
+                       for(i=0; i<arrayList_size(ext_pub_list);i++) {
+                               pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                               if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+                                       count++;
+                               }
+                       }
+
                        if(arrayList_size(ext_pub_list)==0){
                                hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
                                char* topic = (char*)hashMapEntry_getKey(entry);
                                array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
-                               
hashMap_remove(admin->externalPublications,scope_topic);
+                               
hashMap_remove(admin->externalPublications,topic);
                                arrayList_destroy(list);
                                free(topic);
                        }
@@ -567,15 +649,16 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
        celixThreadMutex_lock(&admin->subscriptionsLock);
 
        topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && pubEP->endpoint!=NULL){
-               
pubsub_topicSubscriptionDisconnectPublisher(sub,pubEP->endpoint);
+       if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
        }
 
        /* And check also for ANY subscription */
        topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && pubEP->endpoint!=NULL){
-               
pubsub_topicSubscriptionDisconnectPublisher(any_sub,pubEP->endpoint);
+       if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
        }
+
        free(scope_topic);
        celixThreadMutex_unlock(&admin->subscriptionsLock);
 
@@ -586,7 +669,7 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char 
*scope, char* topic){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Closing all publications for scope=%s,topic=%s\n", scope, 
topic);
+       printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", 
scope, topic);
 
        celixThreadMutex_lock(&admin->localPublicationsLock);
        char* scope_topic =createScopeTopicKey(scope, topic);
@@ -598,6 +681,7 @@ celix_status_t 
pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop
                status += pubsub_topicPublicationStop(pub);
                status += pubsub_topicPublicationDestroy(pub);
 
+               disconnectTopicPubSubFromSerializer(admin, pub, true);
                hashMap_remove(admin->localPublications,scope_topic);
                free(key);
                free(factory);
@@ -612,7 +696,7 @@ celix_status_t 
pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char 
*scope, char* topic){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Closing all subscriptions\n");
+       printf("PSA_UDP_MC: Closing all subscriptions\n");
 
        celixThreadMutex_lock(&admin->subscriptionsLock);
        char* scope_topic =createScopeTopicKey(scope, topic);
@@ -624,6 +708,7 @@ celix_status_t 
pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *sco
                status += pubsub_topicSubscriptionStop(ts);
                status += pubsub_topicSubscriptionDestroy(ts);
 
+               disconnectTopicPubSubFromSerializer(admin, ts, false);
                hashMap_remove(admin->subscriptions,topic);
                free(topic);
 
@@ -635,92 +720,6 @@ celix_status_t 
pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *sco
 
 }
 
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-       status = pubsubAdmin_match(admin, pubEP, score);
-       return status;
-}
-
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-       status = pubsubAdmin_match(admin, subEP, score);
-       return status;
-}
-
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
-       celix_status_t status = CELIX_SUCCESS;
-       admin->serializerSvc = serializerSvc;
-
-       /* Add serializer to all topic_publication_pt */
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       hash_map_iterator_pt lp_iter = 
hashMapIterator_create(admin->localPublications);
-       while(hashMapIterator_hasNext(lp_iter)){
-               service_factory_pt factory = (service_factory_pt) 
hashMapIterator_nextValue(lp_iter);
-               topic_publication_pt topic_pub = (topic_publication_pt) 
factory->handle;
-               pubsub_topicPublicationSetSerializer(topic_pub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(lp_iter);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       /* Add serializer to all topic_subscription_pt */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       hash_map_iterator_pt subs_iter = 
hashMapIterator_create(admin->subscriptions);
-       while(hashMapIterator_hasNext(subs_iter)){
-               topic_subscription_pt topic_sub = (topic_subscription_pt) 
hashMapIterator_nextValue(subs_iter);
-               pubsub_topicSubscriptionSetSerializer(topic_sub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(subs_iter);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-}
-
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
-       celix_status_t status = CELIX_SUCCESS;
-       admin->serializerSvc = NULL;
-
-       /* Remove serializer from all topic_publication_pt */
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       hash_map_iterator_pt lp_iter = 
hashMapIterator_create(admin->localPublications);
-       while(hashMapIterator_hasNext(lp_iter)){
-               service_factory_pt factory = (service_factory_pt) 
hashMapIterator_nextValue(lp_iter);
-               topic_publication_pt topic_pub = (topic_publication_pt) 
factory->handle;
-               pubsub_topicPublicationRemoveSerializer(topic_pub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(lp_iter);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       /* Remove serializer from all topic_subscription_pt */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       hash_map_iterator_pt subs_iter = 
hashMapIterator_create(admin->subscriptions);
-       while(hashMapIterator_hasNext(subs_iter)){
-               topic_subscription_pt topic_sub = (topic_subscription_pt) 
hashMapIterator_nextValue(subs_iter);
-               pubsub_topicSubscriptionRemoveSerializer(topic_sub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(subs_iter);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-}
-
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, 
pubsub_endpoint_pt psEP, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-
-       char topic_psa_prop[1024];
-       snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic);
-
-       const char* psa_to_use = NULL;
-       bundleContext_getPropertyWithDefault(admin->bundle_context, 
topic_psa_prop, PSA_DEFAULT, &psa_to_use);
-
-       *score = 0;
-       if (strcmp(psa_to_use, "udp") == 0){
-               *score += 100;
-       }else{
-               *score += 1;
-       }
-
-       return status;
-}
 
 #ifndef ANDROID
 static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip) {
@@ -764,11 +763,245 @@ static celix_status_t 
pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt a
        array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
        if(pendingListPerTopic==NULL){
                arrayList_create(&pendingListPerTopic);
-               
hashMap_put(admin->pendingSubscriptions,scope_topic,pendingListPerTopic);
-       } else {
-               free(scope_topic);
+               
hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
        }
        arrayList_add(pendingListPerTopic,subEP);
+       free(scope_topic);
+
+       return status;
+}
+
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service){
+       /* Assumption: serializers are all available at startup.
+        * If a new (possibly better) serializer is installed and started, 
already created topic_publications/subscriptions will not be destroyed and 
recreated */
+
+       celix_status_t status = CELIX_SUCCESS;
+       int i=0;
+
+       const char *serType = NULL;
+       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType == NULL){
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+       celixThreadMutex_lock(&admin->serializerListLock);
+       arrayList_add(admin->serializerList, reference);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       /* Now let's re-evaluate the pending */
+       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+       for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
+                       pubsubAdmin_addSubscription(admin, ep);
+               }
+       }
+
+       for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
+                       pubsubAdmin_addPublication(admin, ep);
+               }
+       }
+
+       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+       printf("PSA_UDP_MC: %s serializer added\n",serType);
 
        return status;
 }
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
+
+       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+       int i=0, j=0;
+       const char *serType = NULL;
+
+       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType == NULL){
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+
+       /* Remove the serializer from the list */
+       arrayList_removeElement(admin->serializerList, reference);
+
+       /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+       array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+       if(topicPubList!=NULL){
+               for(i=0;i<arrayList_size(topicPubList);i++){
+                       topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
+                       /* Stop the topic publication */
+                       pubsub_topicPublicationStop(topicPub);
+                       /* Get the endpoints that are going to be orphan */
+                       array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
+                       for(j=0;j<arrayList_size(pubList);j++){
+                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubList,j);
+                               /* Remove the publication */
+                               pubsubAdmin_removePublication(admin, pubEP);
+                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
+                               if(pubEP->endpoint!=NULL){
+                                       free(pubEP->endpoint);
+                                       pubEP->endpoint = NULL;
+                               }
+                               /* Add the orphan endpoint to the noSerializer 
pending list */
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerPublications,pubEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+                       arrayList_destroy(pubList);
+
+                       /* Cleanup also the localPublications hashmap*/
+                       celixThreadMutex_lock(&admin->localPublicationsLock);
+                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
+                       char *key = NULL;
+                       service_factory_pt factory = NULL;
+                       while(hashMapIterator_hasNext(iter)){
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
+                               factory = 
(service_factory_pt)hashMapEntry_getValue(entry);
+                               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
+                               if(pub==topicPub){
+                                       key = (char*)hashMapEntry_getKey(entry);
+                                       break;
+                               }
+                       }
+                       hashMapIterator_destroy(iter);
+                       if(key!=NULL){
+                               hashMap_remove(admin->localPublications, key);
+                               free(factory);
+                               free(key);
+                       }
+                       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+                       /* Finally destroy the topicPublication */
+                       pubsub_topicPublicationDestroy(topicPub);
+               }
+               arrayList_destroy(topicPubList);
+       }
+
+       /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+       array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+       if(topicSubList!=NULL){
+               for(i=0;i<arrayList_size(topicSubList);i++){
+                       topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
+                       /* Stop the topic subscription */
+                       pubsub_topicSubscriptionStop(topicSub);
+                       /* Get the endpoints that are going to be orphan */
+                       array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
+                       for(j=0;j<arrayList_size(subList);j++){
+                               pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(subList,j);
+                               /* Remove the subscription */
+                               pubsubAdmin_removeSubscription(admin, subEP);
+                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
+                               if(subEP->endpoint!=NULL){
+                                       free(subEP->endpoint);
+                                       subEP->endpoint = NULL;
+                               }
+                               /* Add the orphan endpoint to the noSerializer 
pending list */
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+
+                       /* Cleanup also the subscriptions hashmap*/
+                       celixThreadMutex_lock(&admin->subscriptionsLock);
+                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
+                       char *key = NULL;
+                       while(hashMapIterator_hasNext(iter)){
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
+                               topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
+                               if(sub==topicSub){
+                                       key = (char*)hashMapEntry_getKey(entry);
+                                       break;
+                               }
+                       }
+                       hashMapIterator_destroy(iter);
+                       if(key!=NULL){
+                               hashMap_remove(admin->subscriptions, key);
+                               free(key);
+                       }
+                       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+                       /* Finally destroy the topicSubscription */
+                       pubsub_topicSubscriptionDestroy(topicSub);
+               }
+               arrayList_destroy(topicSubList);
+       }
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       printf("PSA_UDP_MC: %s serializer removed\n",serType);
+
+
+       return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       status = 
pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       status = pubsub_admin_get_best_serializer(ep->topic_props, 
admin->serializerList, serSvc);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       return status;
+
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+       array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+       if(list==NULL){
+               arrayList_create(&list);
+               hashMap_put(map,serializer,list);
+       }
+       arrayList_add(list,topicPubSub);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication){
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+       hash_map_iterator_pt iter = hashMapIterator_create(map);
+       while(hashMapIterator_hasNext(iter)){
+               array_list_pt list = 
(array_list_pt)hashMapIterator_nextValue(iter);
+               if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+                       break;
+               }
+       }
+       hashMapIterator_destroy(iter);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}

Reply via email to