http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index e919c9c..9316506 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -57,7 +57,7 @@
 #include "service_factory.h"
 
 #include "topic_subscription.h"
-#include "pubsub_publish_service_private.h"
+#include "topic_publication.h"
 #include "pubsub_endpoint.h"
 #include "pubsub_utils.h"
 #include "subscriber.h"
@@ -66,17 +66,20 @@
 
 static const char *DEFAULT_IP = "127.0.0.1";
 
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip);
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip);
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, 
pubsub_endpoint_pt psEP, double* score);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication);
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin) {
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
        if (!zsys_has_curve()){
-               printf("PSA: zeromq curve unsupported\n");
+               printf("PSA_ZMQ: zeromq curve unsupported\n");
                return CELIX_SERVICE_EXCEPTION;
        }
 #endif
@@ -95,14 +98,19 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                (*admin)->subscriptions = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
                (*admin)->pendingSubscriptions = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
                (*admin)->externalPublications = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+               (*admin)->topicSubscriptionsPerSerializer = 
hashMap_create(NULL, NULL, NULL, NULL);
+               (*admin)->topicPublicationsPerSerializer  = 
hashMap_create(NULL, NULL, NULL, NULL);
+               arrayList_create(&((*admin)->noSerializerSubscriptions));
+               arrayList_create(&((*admin)->noSerializerPublications));
+               arrayList_create(&((*admin)->serializerList));
 
                celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
                celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+               celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
NULL);
                celixThreadMutex_create(&(*admin)->externalPublicationsLock, 
NULL);
-
-               
celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
-               
celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
-               celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
&(*admin)->pendingSubscriptionsAttr);
+               celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
NULL);
+               celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+               celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 
                if (logHelper_create(context, &(*admin)->loghelper) == 
CELIX_SUCCESS) {
                        logHelper_start((*admin)->loghelper);
@@ -115,8 +123,8 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                        const char *interface = NULL;
 
                        bundleContext_getProperty(context, PSA_ITF, &interface);
-                       if (pubsubAdmin_getIpAddress(interface, &detectedIp) != 
CELIX_SUCCESS) {
-                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP adress for interface %s", 
interface);
+                       if (pubsubAdmin_getIpAdress(interface, &detectedIp) != 
CELIX_SUCCESS) {
+                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface 
%s", interface);
                        }
 
                        ip = detectedIp;
@@ -124,11 +132,11 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
 #endif
 
                if (ip != NULL) {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", ip);
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
                        (*admin)->ipAddress = strdup(ip);
                }
                else {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. 
Using %s", DEFAULT_IP);
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. 
Using %s", DEFAULT_IP);
                        (*admin)->ipAddress = strdup(DEFAULT_IP);
                }
 
@@ -136,24 +144,24 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                        free(detectedIp);
                }
 
-        const char* basePortStr = NULL;
-        const char* maxPortStr = NULL;
-        char* endptrBase = NULL;
-        char* endptrMax = NULL;
-        bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, 
"PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
-        bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, 
"PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
-        (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
-        (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
-        if (*endptrBase != '\0') {
-            (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
-        }
-        if (*endptrMax != '\0') {
-            (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
-        }
-
-        printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, 
(*admin)->maxPort);
-
-        // Disable Signal Handling by CZMQ
+               const char* basePortStr = NULL;
+               const char* maxPortStr = NULL;
+               char* endptrBase = NULL;
+               char* endptrMax = NULL;
+               bundleContext_getPropertyWithDefault(context, 
PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
+               bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, 
"PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
+               (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
+               (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
+               if (*endptrBase != '\0') {
+                       (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
+               }
+               if (*endptrMax != '\0') {
+                       (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
+               }
+
+               printf("PSA Using base port %u to max port %u\n", 
(*admin)->basePort, (*admin)->maxPort);
+
+               // Disable Signal Handling by CZMQ
                setenv("ZSYS_SIGHANDLER", "false", true);
 
                const char *nrZmqThreads = NULL;
@@ -164,8 +172,8 @@ celix_status_t pubsubAdmin_create(bundle_context_pt 
context, pubsub_admin_pt *ad
                        unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 
10);
                        if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads 
< 50) {
                                zsys_set_io_threads(nrThreads);
-                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA: Using %d threads for ZMQ", nrThreads);
-                               printf("PSA: Using %d threads for ZMQ\n", 
nrThreads);
+                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
+                               printf("PSA_ZMQ: Using %d threads for ZMQ\n", 
nrThreads);
                        }
                }
 
@@ -227,8 +235,38 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
        hashMap_destroy(admin->externalPublications,false,false);
        celixThreadMutex_unlock(&admin->externalPublicationsLock);
 
+       celixThreadMutex_lock(&admin->serializerListLock);
+       arrayList_destroy(admin->serializerList);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+       arrayList_destroy(admin->noSerializerSubscriptions);
+       arrayList_destroy(admin->noSerializerPublications);
+       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+       while(hashMapIterator_hasNext(iter)){
+               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+       iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+       while(hashMapIterator_hasNext(iter)){
+               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+       celixThreadMutex_destroy(&admin->usedSerializersLock);
+       celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+       celixThreadMutex_destroy(&admin->serializerListLock);
        celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
-       celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
        celixThreadMutex_destroy(&admin->subscriptionsLock);
        celixThreadMutex_destroy(&admin->localPublicationsLock);
        celixThreadMutex_destroy(&admin->externalPublicationsLock);
@@ -258,8 +296,16 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
        if(any_sub==NULL){
 
                int i;
-
-               status += pubsub_topicSubscriptionCreate(admin->bundle_context, 
subEP, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, 
PUBSUB_ANY_SUB_TOPIC, &any_sub);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
+                       status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
+               }
+               else{
+                       printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                       arrayList_add(admin->noSerializerSubscriptions,subEP);
+                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+               }
 
                if (status == CELIX_SUCCESS){
 
@@ -278,6 +324,7 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
                                                        status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
                                                }
                                        }
+                                       arrayList_destroy(topic_publishers);
                                }
                        }
                        hashMapIterator_destroy(lp_iter);
@@ -309,6 +356,7 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 
                if (status == CELIX_SUCCESS){
                        
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+                       connectTopicPubSubToSerializer(admin, best_serializer, 
any_sub, false);
                }
 
        }
@@ -321,18 +369,17 @@ static celix_status_t 
pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
+       printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
 
        if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
                return pubsubAdmin_addAnySubscription(admin,subEP);
        }
 
-       celixThreadMutex_lock(&admin->subscriptionsLock);
        /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
        celixThreadMutex_lock(&admin->localPublicationsLock);
        celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-    char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
 
        service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
        array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -341,12 +388,22 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
                celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
                pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
                celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-       } else {
+       }
+       else{
                int i;
                topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
 
                if(subscription == NULL) {
-                       status += 
pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, 
admin->serializerSvc, subEP->scope, subEP->topic, &subscription);
+                       pubsub_serializer_service_t *best_serializer = NULL;
+                       if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
+                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, 
subEP->topic, best_serializer, &subscription);
+                       }
+                       else{
+                               printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
 
                        if (status==CELIX_SUCCESS){
 
@@ -362,6 +419,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                                                                status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
                                                        }
                                                }
+                                               
arrayList_destroy(topic_publishers);
                                        }
 
                                }
@@ -383,7 +441,10 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                        }
 
                        if(status==CELIX_SUCCESS){
+                               
celixThreadMutex_lock(&admin->subscriptionsLock);
                                
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+                               
celixThreadMutex_unlock(&admin->subscriptionsLock);
+                               connectTopicPubSubToSerializer(admin, 
best_serializer, subscription, false);
                        }
                }
 
@@ -392,10 +453,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint
                }
        }
 
-    free(scope_topic);
+       free(scope_topic);
        celixThreadMutex_unlock(&admin->externalPublicationsLock);
        celixThreadMutex_unlock(&admin->localPublicationsLock);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
 
        return status;
 
@@ -404,11 +464,12 @@ celix_status_t 
pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
+       printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
 
        celixThreadMutex_lock(&admin->subscriptionsLock);
 
-       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,subEP->topic);
+       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
 
        if(sub!=NULL){
                pubsub_topicDecreaseNrSubscribers(sub);
@@ -417,9 +478,16 @@ celix_status_t 
pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
                }
        }
        else{
-               status = CELIX_ILLEGAL_STATE;
+               /* Maybe the endpoint was pending */
+               celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+               if(!arrayList_removeElement(admin->noSerializerSubscriptions, 
subEP)){
+                       status = CELIX_ILLEGAL_STATE;
+               }
+               celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
        }
 
+       free(scope_topic);
+
        celixThreadMutex_unlock(&admin->subscriptionsLock);
 
        return status;
@@ -427,114 +495,127 @@ celix_status_t 
pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 }
 
 celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, 
pubEP->topic);
-
-    const char* fwUUID = NULL;
-
-    char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-    bundleContext_getProperty(admin->bundle_context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-    if (fwUUID == NULL) {
-        printf("PSA: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
-
-    if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
-
-        celixThreadMutex_lock(&admin->localPublicationsLock);
-
-        service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
-
-        if (factory == NULL) {
-            topic_publication_pt pub = NULL;
-            status = pubsub_topicPublicationCreate(admin->bundle_context, 
pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
-                       pubsub_topicPublicationSetSerializer(pub, 
admin->serializerSvc); //TODO serializer back to constructor. This is certainly 
when
-                       //TODO admin are created for every available serializer
-            if (status == CELIX_SUCCESS) {
-                status = pubsub_topicPublicationStart(admin->bundle_context, 
pub, &factory);
-                if (status == CELIX_SUCCESS && factory != NULL) {
-                    hashMap_put(admin->localPublications, strdup(scope_topic), 
factory);
-                }
-            } else {
-                printf("PSA: Cannot create a topicPublication for scope=%s, 
topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID);
-            }
-        } else {
-            //just add the new EP to the list
-            topic_publication_pt pub = (topic_publication_pt) factory->handle;
-            pubsub_topicPublicationAddPublisherEP(pub, pubEP);
-        }
-
-        celixThreadMutex_unlock(&admin->localPublicationsLock);
-    } else {
-
-        celixThreadMutex_lock(&admin->externalPublicationsLock);
-        array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
-        if (ext_pub_list == NULL) {
-            arrayList_create(&ext_pub_list);
-            hashMap_put(admin->externalPublications, strdup(scope_topic), 
ext_pub_list);
-        }
-
-        arrayList_add(ext_pub_list, pubEP);
-
-        celixThreadMutex_unlock(&admin->externalPublicationsLock);
-    }
-
-    /* Re-evaluate the pending subscriptions */
-    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-
-    hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
-    if (pendingSub != NULL) { //There were pending subscription for the just 
published topic. Let's connect them.
-        char* topic = (char*) hashMapEntry_getKey(pendingSub);
-        array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
-        int i;
-        for (i = 0; i < arrayList_size(pendingSubList); i++) {
-            pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
-            pubsubAdmin_addSubscription(admin, subEP);
-        }
-        hashMap_remove(admin->pendingSubscriptions, scope_topic);
-        arrayList_clear(pendingSubList);
-        arrayList_destroy(pendingSubList);
-        free(topic);
-    }
-
-    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-    /* Connect the new publisher to the subscription for his topic, if there 
is any */
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-
-    topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
-    if (sub != NULL && pubEP->endpoint != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
-    }
-
-    /* And check also for ANY subscription */
-    topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-    if (any_sub != NULL && pubEP->endpoint != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
-    }
-    free(scope_topic);
-
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    return status;
+       celix_status_t status = CELIX_SUCCESS;
+
+       printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, 
pubEP->topic);
+
+       const char* fwUUID = NULL;
+
+       bundleContext_getProperty(admin->bundle_context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+       if (fwUUID == NULL) {
+               printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+               return CELIX_INVALID_BUNDLE_CONTEXT;
+       }
+
+       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
+       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
+
+               celixThreadMutex_lock(&admin->localPublicationsLock);
+
+               service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
+
+               if (factory == NULL) {
+                       topic_publication_pt pub = NULL;
+                       pubsub_serializer_service_t *best_serializer = NULL;
+                       if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer)) == CELIX_SUCCESS){
+                               status = 
pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, 
admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+                       }
+                       else{
+                               printf("PSA_ZMQ: Cannot find a serializer for 
publishing topic %s. Adding it to pending list.\n", pubEP->topic);
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerPublications,pubEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+
+                       if (status == CELIX_SUCCESS) {
+                               status = 
pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
+                               if (status == CELIX_SUCCESS && factory != NULL) 
{
+                                       hashMap_put(admin->localPublications, 
strdup(scope_topic), factory);
+                                       connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
+                               }
+                       } else {
+                               printf("PSA_ZMQ: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
+                       }
+               } else {
+                       //just add the new EP to the list
+                       topic_publication_pt pub = (topic_publication_pt) 
factory->handle;
+                       pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+               }
+
+               celixThreadMutex_unlock(&admin->localPublicationsLock);
+       }
+       else{
+
+               celixThreadMutex_lock(&admin->externalPublicationsLock);
+               array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
+               if (ext_pub_list == NULL) {
+                       arrayList_create(&ext_pub_list);
+                       hashMap_put(admin->externalPublications, 
strdup(scope_topic), ext_pub_list);
+               }
+
+               arrayList_add(ext_pub_list, pubEP);
+
+               celixThreadMutex_unlock(&admin->externalPublicationsLock);
+       }
+
+       /* Re-evaluate the pending subscriptions */
+       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+
+       hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+       if (pendingSub != NULL) { //There were pending subscription for the 
just published topic. Let's connect them.
+               char* topic = (char*) hashMapEntry_getKey(pendingSub);
+               array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
+               int i;
+               for (i = 0; i < arrayList_size(pendingSubList); i++) {
+                       pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
+                       pubsubAdmin_addSubscription(admin, subEP);
+               }
+               hashMap_remove(admin->pendingSubscriptions, scope_topic);
+               arrayList_clear(pendingSubList);
+               arrayList_destroy(pendingSubList);
+               free(topic);
+       }
+
+       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+       /* Connect the new publisher to the subscription for his topic, if 
there is any */
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+
+       topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
+       if (sub != NULL && pubEP->endpoint != NULL) {
+               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
+       }
+
+       /* And check also for ANY subscription */
+       topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+       if (any_sub != NULL && pubEP->endpoint != NULL) {
+               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
+       }
+
+       free(scope_topic);
+
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       return status;
 
 }
 
 celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
        celix_status_t status = CELIX_SUCCESS;
-        int count = 0;
+       int count = 0;
 
-       printf("PSA: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
+       printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
 
        const char* fwUUID = NULL;
 
        
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
        if(fwUUID==NULL){
-               printf("PSA: Cannot retrieve fwUUID.\n");
+               printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
                return CELIX_INVALID_BUNDLE_CONTEXT;
        }
        char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
        if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
 
                celixThreadMutex_lock(&admin->localPublicationsLock);
@@ -545,7 +626,12 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                        pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
                }
                else{
-                       status = CELIX_ILLEGAL_STATE;
+                       /* Maybe the endpoint was pending */
+                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                       
if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
+                               status = CELIX_ILLEGAL_STATE;
+                       }
+                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
                }
 
                celixThreadMutex_unlock(&admin->localPublicationsLock);
@@ -561,20 +647,18 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
                                found = pubsubEndpoint_equals(pubEP,p);
                                if (found){
-                                       found = true;
                                        arrayList_remove(ext_pub_list,i);
                                }
                        }
                        // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
-                       int ext_pub_list_size = arrayList_size(ext_pub_list);
-                       for(i=0; i<ext_pub_list_size; i++) {
+                       for(i=0; i<arrayList_size(ext_pub_list);i++) {
                                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
                                if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
                                        count++;
                                }
                        }
 
-                       if(ext_pub_list_size == 0){
+                       if(arrayList_size(ext_pub_list)==0){
                                hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
                                char* topic = (char*)hashMapEntry_getKey(entry);
                                array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
@@ -582,7 +666,6 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
                                arrayList_destroy(list);
                                free(topic);
                        }
-
                }
 
                celixThreadMutex_unlock(&admin->externalPublicationsLock);
@@ -599,8 +682,9 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
        /* And check also for ANY subscription */
        topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
        if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
        }
+
        free(scope_topic);
        celixThreadMutex_unlock(&admin->subscriptionsLock);
 
@@ -611,20 +695,21 @@ celix_status_t 
pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char 
*scope, char* topic){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Closing all publications\n");
+       printf("PSA_ZMQ: Closing all publications\n");
 
        celixThreadMutex_lock(&admin->localPublicationsLock);
        char *scope_topic = createScopeTopicKey(scope, topic);
        hash_map_entry_pt pubsvc_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
        if(pubsvc_entry!=NULL){
-               char* topic = (char*)hashMapEntry_getKey(pubsvc_entry);
+               char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
                service_factory_pt factory= 
(service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
                topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
                status += pubsub_topicPublicationStop(pub);
                status += pubsub_topicPublicationDestroy(pub);
 
+               disconnectTopicPubSubFromSerializer(admin, pub, true);
                hashMap_remove(admin->localPublications,scope_topic);
-               free(topic);
+               free(key);
                free(factory);
        }
        free(scope_topic);
@@ -637,7 +722,7 @@ celix_status_t 
pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *sco
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope,char* topic){
        celix_status_t status = CELIX_SUCCESS;
 
-       printf("PSA: Closing all subscriptions\n");
+       printf("PSA_ZMQ: Closing all subscriptions\n");
 
        celixThreadMutex_lock(&admin->subscriptionsLock);
        char *scope_topic = createScopeTopicKey(scope, topic);
@@ -649,6 +734,7 @@ celix_status_t 
pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
                status += pubsub_topicSubscriptionStop(ts);
                status += pubsub_topicSubscriptionDestroy(ts);
 
+               disconnectTopicPubSubFromSerializer(admin, ts, false);
                hashMap_remove(admin->subscriptions,scope_topic);
                free(topic);
 
@@ -660,95 +746,9 @@ celix_status_t 
pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 
 }
 
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-       status = pubsubAdmin_match(admin, pubEP, score);
-       return status;
-}
-
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-       status = pubsubAdmin_match(admin, subEP, score);
-       return status;
-}
-
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc) {
-       celix_status_t status = CELIX_SUCCESS;
-       admin->serializerSvc = serializerSvc;
-
-       /* Add serializer to all topic_publication_pt */
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       hash_map_iterator_pt lp_iter = 
hashMapIterator_create(admin->localPublications);
-       while(hashMapIterator_hasNext(lp_iter)){
-               service_factory_pt factory = (service_factory_pt) 
hashMapIterator_nextValue(lp_iter);
-               topic_publication_pt topic_pub = (topic_publication_pt) 
factory->handle;
-               pubsub_topicPublicationSetSerializer(topic_pub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(lp_iter);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       /* Add serializer to all topic_subscription_pt */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       hash_map_iterator_pt subs_iter = 
hashMapIterator_create(admin->subscriptions);
-       while(hashMapIterator_hasNext(subs_iter)){
-               topic_subscription_pt topic_sub = (topic_subscription_pt) 
hashMapIterator_nextValue(subs_iter);
-               pubsub_topicSubscriptionSetSerializer(topic_sub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(subs_iter);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-}
-
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
-       celix_status_t status = CELIX_SUCCESS;
-       admin->serializerSvc = NULL;
-
-       /* Remove serializer from all topic_publication_pt */
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       hash_map_iterator_pt lp_iter = 
hashMapIterator_create(admin->localPublications);
-       while(hashMapIterator_hasNext(lp_iter)){
-               service_factory_pt factory = (service_factory_pt) 
hashMapIterator_nextValue(lp_iter);
-               topic_publication_pt topic_pub = (topic_publication_pt) 
factory->handle;
-               pubsub_topicPublicationRemoveSerializer(topic_pub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(lp_iter);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       /* Remove serializer from all topic_subscription_pt */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       hash_map_iterator_pt subs_iter = 
hashMapIterator_create(admin->subscriptions);
-       while(hashMapIterator_hasNext(subs_iter)){
-               topic_subscription_pt topic_sub = (topic_subscription_pt) 
hashMapIterator_nextValue(subs_iter);
-               pubsub_topicSubscriptionRemoveSerializer(topic_sub, 
admin->serializerSvc);
-       }
-       hashMapIterator_destroy(subs_iter);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-}
-
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, 
pubsub_endpoint_pt psEP, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-
-       char topic_psa_prop[1024];
-       snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic);
-
-       const char* psa_to_use = NULL;
-       bundleContext_getPropertyWithDefault(admin->bundle_context, 
topic_psa_prop, PSA_DEFAULT, &psa_to_use);
-
-       *score = 0;
-       if (strcmp(psa_to_use, "zmq") == 0){
-               *score += 100;
-       }else{
-               *score += 1;
-       }
-
-       return status;
-}
 
 #ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip) {
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip) {
        celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 
        struct ifaddrs *ifaddr, *ifa;
@@ -792,3 +792,237 @@ static celix_status_t 
pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt a
        free(scope_topic);
        return status;
 }
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service){
+       /* Assumption: serializers are all available at startup.
+        * If a new (possibly better) serializer is installed and started, 
already created topic_publications/subscriptions will not be destroyed and 
recreated */
+
+       celix_status_t status = CELIX_SUCCESS;
+       int i=0;
+
+       const char *serType = NULL;
+       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType == NULL){
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+       celixThreadMutex_lock(&admin->serializerListLock);
+       arrayList_add(admin->serializerList, reference);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       /* Now let's re-evaluate the pending */
+       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+       for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
+                       pubsubAdmin_addSubscription(admin, ep);
+               }
+       }
+
+       for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
+                       pubsubAdmin_addPublication(admin, ep);
+               }
+       }
+
+       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+       printf("PSA_ZMQ: %s serializer added\n",serType);
+
+       return status;
+}
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
+
+       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+       int i=0, j=0;
+       const char *serType = NULL;
+
+       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType == NULL){
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+
+       /* Remove the serializer from the list */
+       arrayList_removeElement(admin->serializerList, reference);
+
+       /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+       array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+       if(topicPubList!=NULL){
+               for(i=0;i<arrayList_size(topicPubList);i++){
+                       topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
+                       /* Stop the topic publication */
+                       pubsub_topicPublicationStop(topicPub);
+                       /* Get the endpoints that are going to be orphan */
+                       array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
+                       for(j=0;j<arrayList_size(pubList);j++){
+                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubList,j);
+                               /* Remove the publication */
+                               pubsubAdmin_removePublication(admin, pubEP);
+                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
+                               if(pubEP->endpoint!=NULL){
+                                       free(pubEP->endpoint);
+                                       pubEP->endpoint = NULL;
+                               }
+                               /* Add the orphan endpoint to the noSerializer 
pending list */
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerPublications,pubEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+                       arrayList_destroy(pubList);
+
+                       /* Cleanup also the localPublications hashmap*/
+                       celixThreadMutex_lock(&admin->localPublicationsLock);
+                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
+                       char *key = NULL;
+                       service_factory_pt factory = NULL;
+                       while(hashMapIterator_hasNext(iter)){
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
+                               factory = 
(service_factory_pt)hashMapEntry_getValue(entry);
+                               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
+                               if(pub==topicPub){
+                                       key = (char*)hashMapEntry_getKey(entry);
+                                       break;
+                               }
+                       }
+                       hashMapIterator_destroy(iter);
+                       if(key!=NULL){
+                               hashMap_remove(admin->localPublications, key);
+                               free(factory);
+                               free(key);
+                       }
+                       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+                       /* Finally destroy the topicPublication */
+                       pubsub_topicPublicationDestroy(topicPub);
+               }
+               arrayList_destroy(topicPubList);
+       }
+
+       /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+       array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+       if(topicSubList!=NULL){
+               for(i=0;i<arrayList_size(topicSubList);i++){
+                       topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
+                       /* Stop the topic subscription */
+                       pubsub_topicSubscriptionStop(topicSub);
+                       /* Get the endpoints that are going to be orphan */
+                       array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
+                       for(j=0;j<arrayList_size(subList);j++){
+                               pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(subList,j);
+                               /* Remove the subscription */
+                               pubsubAdmin_removeSubscription(admin, subEP);
+                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
+                               if(subEP->endpoint!=NULL){
+                                       free(subEP->endpoint);
+                                       subEP->endpoint = NULL;
+                               }
+                               /* Add the orphan endpoint to the noSerializer 
pending list */
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+
+                       /* Cleanup also the subscriptions hashmap*/
+                       celixThreadMutex_lock(&admin->subscriptionsLock);
+                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
+                       char *key = NULL;
+                       while(hashMapIterator_hasNext(iter)){
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
+                               topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
+                               if(sub==topicSub){
+                                       key = (char*)hashMapEntry_getKey(entry);
+                                       break;
+                               }
+                       }
+                       hashMapIterator_destroy(iter);
+                       if(key!=NULL){
+                               hashMap_remove(admin->subscriptions, key);
+                               free(key);
+                       }
+                       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+                       /* Finally destroy the topicSubscription */
+                       pubsub_topicSubscriptionDestroy(topicSub);
+               }
+               arrayList_destroy(topicSubList);
+       }
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       printf("PSA_ZMQ: %s serializer removed\n",serType);
+
+
+       return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       status = 
pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       status = pubsub_admin_get_best_serializer(ep->topic_props, 
admin->serializerList, serSvc);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       return status;
+
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+       array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+       if(list==NULL){
+               arrayList_create(&list);
+               hashMap_put(map,serializer,list);
+       }
+       arrayList_add(list,topicPubSub);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication){
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+       hash_map_iterator_pt iter = hashMapIterator_create(map);
+       while(hashMapIterator_hasNext(iter)){
+               array_list_pt list = 
(array_list_pt)hashMapIterator_nextValue(iter);
+               if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+                       break;
+               }
+       }
+       hashMapIterator_destroy(iter);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 2eaad97..b741771 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -7,7 +7,7 @@
  *"License"); you may not use this file except in compliance
  *with the License.  You may obtain a copy of the License at
  *
- *  http://www.apache.org/licenses/LICENSE-2.0
+ *  htPSA_ZMQ_TP://www.apache.org/licenses/LICENSE-2.0
  *
  *Unless required by applicable law or agreed to in writing,
  *software distributed under the License is distributed on an
@@ -24,7 +24,6 @@
  *  \copyright Apache License, Version 2.0
  */
 
-#include "pubsub_publish_service_private.h"
 #include <czmq.h>
 /* The following undefs prevent the collision between:
  * - sys/syslog.h (which is included within czmq)
@@ -50,6 +49,10 @@
 #include "pubsub_utils.h"
 #include "publisher.h"
 
+#include "topic_publication.h"
+
+#include "pubsub_serializer.h"
+
 #ifdef BUILD_WITH_ZMQ_SECURITY
        #include "zmq_crypto.h"
 
@@ -69,21 +72,21 @@ struct topic_publication {
        service_registration_pt svcFactoryReg;
        array_list_pt pub_ep_list; //List<pubsub_endpoint>
        hash_map_pt boundServices; //<bundle_pt,bound_service>
-       celix_thread_mutex_t tp_lock; // Protects topic_publication data 
structure
-       pubsub_serializer_service_t* serializerSvc;
+       pubsub_serializer_service_t *serializer;
+       celix_thread_mutex_t tp_lock;
 };
 
 typedef struct publish_bundle_bound_service {
        topic_publication_pt parent;
-       pubsub_publisher_t pubSvc;
+       pubsub_publisher_t service;
        bundle_pt bundle;
        char *topic;
-       pubsub_msg_serializer_map_t* map;
+       hash_map_pt msgTypes;
        unsigned short getCount;
        celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service 
data structure
        bool mp_send_in_progress;
        array_list_pt mp_parts;
-} publish_bundle_bound_service_t;
+}* publish_bundle_bound_service_pt;
 
 /* Note: correct locking order is
  * 1. tp_lock
@@ -93,27 +96,27 @@ typedef struct publish_bundle_bound_service {
  * tp_lock and socket_lock are independent.
  */
 
-typedef struct pubsub_msg {
+typedef struct pubsub_msg{
        pubsub_msg_header_pt header;
        char* payload;
        int payloadSize;
-} pubsub_msg_t;
+}* pubsub_msg_pt;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc);
+static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *msg, int flags);
+static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *inMsg, int flags);
 static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int 
maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -128,7 +131,7 @@ celix_status_t 
pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
                for (i = 0; i < secure_topics_size; i++){
                        char* top = arrayList_get(secure_topics_list, i);
                        if (strcmp(pubEP->topic, top) == 0){
-                               printf("TP: Secure topic: '%s'\n", top);
+                               printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
                                pubEP->is_secure = true;
                        }
                        free(top);
@@ -155,12 +158,12 @@ celix_status_t 
pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
                //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
                snprintf(cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, 
pubEP->topic);
                free(keys_bundle_dir);
-               printf("TP: Loading key '%s'\n", cert_path);
+               printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
 
                pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, 
(char *) keys_file_name, cert_path);
                if (pub_cert == NULL){
-                       printf("TP: Cannot load key '%s'\n", cert_path);
-                       printf("TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
+                       printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
+                       printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", 
pubEP->topic);
                        pubEP->is_secure = false;
                }
        }
@@ -218,7 +221,7 @@ celix_status_t 
pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
 
        pub->endpoint = ep;
        pub->zmq_socket = socket;
-       pub->serializerSvc = NULL;
+       pub->serializer = best_serializer;
 
        celixThreadMutex_create(&(pub->socket_lock),NULL);
 
@@ -245,13 +248,14 @@ celix_status_t 
pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
        hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
        while(hashMapIterator_hasNext(iter)){
-               publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(iter);
+               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
                pubsub_destroyPublishBundleBoundService(bound);
        }
        hashMapIterator_destroy(iter);
        hashMap_destroy(pub->boundServices,false,false);
 
        pub->svcFactoryReg = NULL;
+       pub->serializer = NULL;
 #ifdef BUILD_WITH_ZMQ_SECURITY
        zcert_destroy(&(pub->zmq_cert));
 #endif
@@ -275,7 +279,6 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
        celix_status_t status = CELIX_SUCCESS;
 
        /* Let's register the new service */
-       //celixThreadMutex_lock(&(pub->tp_lock));
 
        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 
@@ -294,30 +297,22 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 
                if(status != CELIX_SUCCESS){
                        properties_destroy(props);
-                       printf("PSA: Cannot register ServiceFactory for topic 
%s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
+                       printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register 
ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
                }
                else{
                        *svcFactory = factory;
                }
        }
        else{
-               printf("PSA: Cannot find pubsub_endpoint after adding 
it...Should never happen!\n");
+               printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after 
adding it...Should never happen!\n");
                status = CELIX_SERVICE_EXCEPTION;
        }
 
-       //celixThreadMutex_unlock(&(pub->tp_lock));
-
        return status;
 }
 
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-       celix_status_t status = CELIX_SUCCESS;
-
-       //celixThreadMutex_lock(&(pub->tp_lock));
-       status = serviceRegistration_unregister(pub->svcFactoryReg);
-       //celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return status;
+       return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
@@ -345,63 +340,12 @@ celix_status_t 
pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-
-       //clearing pref serializer
-       if (pub->serializerSvc != NULL) {
-               hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-               while (hashMapIterator_hasNext(&iter)) {
-                       publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
-            celixThreadMutex_lock(&bound->mp_lock);
-            
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-                       bound->map = NULL;
-               }
-       }
-
-       pub->serializerSvc = serializerSvc;
-    if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            bundle_pt bundle = hashMapEntry_getKey(entry);
-            publish_bundle_bound_service_t* bound = 
hashMapEntry_getValue(entry);
-            celixThreadMutex_lock(&bound->mp_lock);
-            
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-        }
-    }
-
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* svc){
-       celix_status_t status = CELIX_SUCCESS;
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
+       array_list_pt list = NULL;
        celixThreadMutex_lock(&(pub->tp_lock));
-
-       if (pub->serializerSvc == svc) {
-               hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-               while (hashMapIterator_hasNext(&iter)) {
-                       publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
-            celixThreadMutex_lock(&bound->mp_lock);
-                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-                       bound->map = NULL;
-               }
-               pub->serializerSvc = NULL;
-       }
-
+       list = arrayList_clone(pub->pub_ep_list);
        celixThreadMutex_unlock(&(pub->tp_lock));
-       return status;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
-       return pub->pub_ep_list;
+       return list;
 }
 
 
@@ -412,7 +356,7 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices,bundle);
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
        if(bound==NULL){
                bound = pubsub_createPublishBundleBoundService(publish,bundle);
                if(bound!=NULL){
@@ -423,9 +367,7 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
                bound->getCount++;
        }
 
-       if(bound!=NULL){
-               *service = &bound->pubSvc;
-       }
+       *service = &bound->service;
 
        celixThreadMutex_unlock(&(publish->tp_lock));
 
@@ -438,7 +380,7 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices,bundle);
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
        if(bound!=NULL){
 
                bound->getCount--;
@@ -451,7 +393,7 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
        else{
                long bundleId = -1;
                bundle_getBundleId(bundle,&bundleId);
-               printf("TP: Unexpected ungetService call for bundle %ld.\n", 
bundleId);
+               printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
        }
 
        /* service should be never used for unget, so let's set the pointer to 
NULL */
@@ -462,7 +404,7 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
        return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
 
        bool ret = true;
 
@@ -502,7 +444,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, 
array_list_pt mp_msg_parts){
        unsigned int i = 0;
        unsigned int mp_num = arrayList_size(mp_msg_parts);
        for(;i<mp_num;i++){
-               ret = ret && send_pubsub_msg(zmq_socket, 
(pubsub_msg_t*)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+               ret = ret && send_pubsub_msg(zmq_socket, 
(pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
        }
        arrayList_clear(mp_msg_parts);
 
@@ -511,44 +453,33 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, 
array_list_pt mp_msg_parts){
 }
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg) {
+
        return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, 
PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
+
 }
 
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *msg, int flags){
+static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *inMsg, int flags){
+
        int status = 0;
-       publish_bundle_bound_service_t* bound = handle;
-       celixThreadMutex_lock(&(bound->mp_lock));
 
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
+
+       celixThreadMutex_lock(&(bound->mp_lock));
        if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & 
PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
-               printf("TP: Multipart send already in progress. Cannot process 
a new one.\n");
+               printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot 
process a new one.\n");
                celixThreadMutex_unlock(&(bound->mp_lock));
                return -3;
-    }
-
-       pubsub_msg_serializer_t* msgSer = NULL;
-       if (bound->map != NULL) {
-               msgSer = hashMap_get(bound->map->serializers, 
(void*)(uintptr_t)msgTypeId);
        }
 
-    if (bound->map == NULL) {
-        printf("TP: Serializer is not set!\n");
-        status = 1;
-    } else if (msgSer == NULL ){
-        printf("TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
-        printf("Note supported messages:\n");
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
-            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
-        }
-        status = 1;
-    }
+       pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId);
+
+       if (msgSer!= NULL) {
+               int major=0, minor=0;
 
-       int major=0, minor=0;
-       if (status == 0 && msgSer != NULL) {
                pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
                strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
                msg_hdr->type = msgTypeId;
+
                if (msgSer->msgVersion != NULL){
                        version_getMajor(msgSer->msgVersion, &major);
                        version_getMinor(msgSer->msgVersion, &minor);
@@ -556,23 +487,24 @@ static int pubsub_topicPublicationSendMultipart(void 
*handle, unsigned int msgTy
                        msg_hdr->minor = minor;
                }
 
-               char* serializedOutput = NULL;
+               void *serializedOutput = NULL;
                size_t serializedOutputLen = 0;
-               msgSer->serialize(msgSer->handle, msg, &serializedOutput, 
&serializedOutputLen);
-               pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
+               msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
+
+               pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
                msg->header = msg_hdr;
-               msg->payload = (char *) serializedOutput;
+               msg->payload = (char*)serializedOutput;
                msg->payloadSize = serializedOutputLen;
                bool snd = true;
 
-               switch (flags) {
+               switch(flags){
                case PUBSUB_PUBLISHER_FIRST_MSG:
                        bound->mp_send_in_progress = true;
                        arrayList_add(bound->mp_parts,msg);
                        break;
                case PUBSUB_PUBLISHER_PART_MSG:
                        if(!bound->mp_send_in_progress){
-                               printf("TP: ERROR: received msg part without 
the first part.\n");
+                               printf("PSA_ZMQ_TP: ERROR: received msg part 
without the first part.\n");
                                status = -4;
                        }
                        else{
@@ -581,72 +513,53 @@ static int pubsub_topicPublicationSendMultipart(void 
*handle, unsigned int msgTy
                        break;
                case PUBSUB_PUBLISHER_LAST_MSG:
                        if(!bound->mp_send_in_progress){
-                               printf("TP: ERROR: received end msg without the 
first part.\n");
+                               printf("PSA_ZMQ_TP: ERROR: received end msg 
without the first part.\n");
                                status = -4;
                        }
                        else{
                                arrayList_add(bound->mp_parts,msg);
-                               
celixThreadMutex_lock(&(bound->parent->socket_lock));
+                               
celixThreadMutex_lock(&(bound->parent->tp_lock));
                                snd = 
send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
                                bound->mp_send_in_progress = false;
-                               
celixThreadMutex_unlock(&(bound->parent->socket_lock));
+                               
celixThreadMutex_unlock(&(bound->parent->tp_lock));
                        }
                        break;
                case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:    
//Normal send case
-                       celixThreadMutex_lock(&(bound->parent->socket_lock));
+                       celixThreadMutex_lock(&(bound->parent->tp_lock));
                        snd = 
send_pubsub_msg(bound->parent->zmq_socket,msg,true);
-                       celixThreadMutex_unlock(&(bound->parent->socket_lock));
+                       celixThreadMutex_unlock(&(bound->parent->tp_lock));
                        break;
                default:
-                       printf("TP: ERROR: Invalid MP flags combination\n");
+                       printf("PSA_ZMQ_TP: ERROR: Invalid MP flags 
combination\n");
                        status = -4;
                        break;
                }
 
-               /* Free msg in case we got into a bad branch */
                if(status==-4){
                        free(msg);
                }
 
                if(!snd){
-                       printf("TP: Failed to send %s message %u.\n",flags == 
(PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : 
"multipart", msgTypeId);
+                       printf("PSA_ZMQ_TP: Failed to send %s message 
%u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? 
"single" : "multipart", msgTypeId);
                }
 
        } else {
-               printf("TP: Message %u not supported.\n",msgTypeId);
+        printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
                status=-1;
        }
 
        celixThreadMutex_unlock(&(bound->mp_lock));
+
        return status;
-}
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* out){
-       publish_bundle_bound_service_t* bound = handle;
-       unsigned int msgTypeId = 0;
-
-    celixThreadMutex_lock(&bound->mp_lock);
-       if (bound->map != NULL) {
-               hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
-               while (hashMapIterator_hasNext(&iter)) {
-                       pubsub_msg_serializer_t* msgSer = 
hashMapIterator_nextValue(&iter);
-                       if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
-                               msgTypeId = msgSer->msgId;
-                               break;
-                       }
-               }
-       }
-    celixThreadMutex_unlock(&bound->mp_lock);
+}
 
-       if (msgTypeId != 0) {
-               *out = msgTypeId;
-               return 0;
-       } else {
-               printf("TP: Cannot find msg type id for msg type %s\n", 
msgType);
-               return 1;
-       }
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
 }
 
+
 static unsigned int rand_range(unsigned int min, unsigned int max){
 
        double scaled = (double)(((double)rand())/((double)RAND_MAX));
@@ -654,10 +567,11 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max){
 
 }
 
-static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+
        //PRECOND lock on tp->lock
 
-       publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
+       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 
        if (bound != NULL) {
 
@@ -667,42 +581,41 @@ static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(to
                bound->mp_send_in_progress = false;
                celixThreadMutex_create(&bound->mp_lock,NULL);
 
-               if (tp->serializerSvc != NULL) {
-                       
tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, 
&bound->map);
+               if(tp->serializer != NULL){
+                       
tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
                }
+
                arrayList_create(&bound->mp_parts);
 
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
                bound->topic=strdup(pubEP->topic);
 
-               bound->pubSvc.handle = bound;
-               bound->pubSvc.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
-               bound->pubSvc.send = pubsub_topicPublicationSend;
-               bound->pubSvc.sendMultipart = 
pubsub_topicPublicationSendMultipart;
-       }
-       else
-       {
-               free(bound);
-               return NULL;
+               bound->service.handle = bound;
+               bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
+               bound->service.send = pubsub_topicPublicationSend;
+               bound->service.sendMultipart = 
pubsub_topicPublicationSendMultipart;
+
        }
 
        return bound;
 }
 
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc){
-       //PRECOND lock on publish->tp_lock
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
+
+       //PRECOND lock on tp->lock
+
        celixThreadMutex_lock(&boundSvc->mp_lock);
 
-       if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
-               
boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle,
 boundSvc->map);
-               boundSvc->map = NULL;
+
+       if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
+               
boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
 boundSvc->msgTypes);
        }
 
-       if (boundSvc->mp_parts!=NULL) {
+       if(boundSvc->mp_parts!=NULL){
                arrayList_destroy(boundSvc->mp_parts);
        }
 
-       if (boundSvc->topic!=NULL) {
+       if(boundSvc->topic!=NULL){
                free(boundSvc->topic);
        }
 
@@ -718,7 +631,7 @@ static void delay_first_send_for_late_joiners(){
        static bool firstSend = true;
 
        if(firstSend){
-               printf("TP: Delaying first send for late joiners...\n");
+               printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
                sleep(FIRST_SEND_DELAY);
                firstSend = false;
        }

Reply via email to