http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 409c7a5..cf51ed9 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -49,51 +49,49 @@
 #include "pubsub_utils.h"
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
-       #include "zmq_crypto.h"
+#include "zmq_crypto.h"
 
-       #define MAX_CERT_PATH_LENGTH 512
+#define MAX_CERT_PATH_LENGTH 512
 #endif
 
 #define POLL_TIMEOUT   250
 #define ZMQ_POLL_TIMEOUT_MS_ENV        "ZMQ_POLL_TIMEOUT_MS"
 
-struct topic_subscription {
+struct topic_subscription{
+
        zsock_t* zmq_socket;
        zcert_t * zmq_cert;
        zcert_t * zmq_pub_cert;
-       pthread_mutex_t socket_lock; //Protects zmq_socket access
+       pthread_mutex_t socket_lock;
        service_tracker_pt tracker;
        array_list_pt sub_ep_list;
        celix_thread_t recv_thread;
        bool running;
-       celix_thread_mutex_t ts_lock; //Protects topic_subscription data 
structure access
+       celix_thread_mutex_t ts_lock;
        bundle_context_pt context;
 
-       hash_map_pt msgSerializerMapMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
-    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
-       array_list_pt pendingConnections;
-       array_list_pt pendingDisconnections;
+       pubsub_serializer_service_t *serializer;
+
+       hash_map_pt servicesMap; // key = service, value = msg types map
 
        celix_thread_mutex_t pendingConnections_lock;
+       array_list_pt pendingConnections;
+
+       array_list_pt pendingDisconnections;
        celix_thread_mutex_t pendingDisconnections_lock;
+
        unsigned int nrSubscribers;
-       pubsub_serializer_service_t* serializerSvc;
 };
 
-/* Note: correct locking order is
- * 1. socket_lock
- * 2. ts_lock
- */
-
-typedef struct complete_zmq_msg {
+typedef struct complete_zmq_msg{
        zframe_t* header;
        zframe_t* payload;
 }* complete_zmq_msg_pt;
 
-typedef struct mp_handle {
-       pubsub_msg_serializer_map_t* map;
+typedef struct mp_handle{
+       hash_map_pt svc_msg_db;
        hash_map_pt rcv_msg_map;
-} mp_handle_t;
+}* mp_handle_pt;
 
 typedef struct msg_map_entry{
        bool retain;
@@ -107,100 +105,66 @@ static bool checkVersion(version_pt 
msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool 
retain, void **part);
-static mp_handle_t* create_mp_handle(topic_subscription_pt sub, 
pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_t* mp_handle);
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt 
rcv_msg_list);
+static void destroy_mp_handle(mp_handle_pt mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* 
serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* scope, char* topic, pubsub_serializer_service_t 
*best_serializer, topic_subscription_pt* out){
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
-       if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC) != 0){
-               char* secure_topics = NULL;
-               bundleContext_getProperty(bundle_context, "SECURE_TOPICS", 
(const char **) &secure_topics);
-
-               if (secure_topics){
-                       array_list_pt secure_topics_list = 
pubsub_getTopicsFromString(secure_topics);
-
-                       int i;
-                       int secure_topics_size = 
arrayList_size(secure_topics_list);
-                       for (i = 0; i < secure_topics_size; i++){
-                               char* top = arrayList_get(secure_topics_list, 
i);
-                               if (strcmp(topic, top) == 0){
-                                       printf("TS: Secure topic: '%s'\n", top);
-                                       subEP->is_secure = true;
-                               }
-                               free(top);
-                               top = NULL;
-                       }
-
-                       arrayList_destroy(secure_topics_list);
-               }
+       char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+       if (keys_bundle_dir == NULL){
+               return CELIX_SERVICE_EXCEPTION;
        }
 
-       zcert_t* sub_cert = NULL;
-       zcert_t* pub_cert = NULL;
-       const char* pub_key = NULL;
-       if (subEP->is_secure){
-               char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-               if (keys_bundle_dir == NULL){
-                       return CELIX_SERVICE_EXCEPTION;
-               }
-
-               const char* keys_file_path = NULL;
-               const char* keys_file_name = NULL;
-               bundleContext_getProperty(bundle_context, 
PROPERTY_KEYS_FILE_PATH, &keys_file_path);
-               bundleContext_getProperty(bundle_context, 
PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+       const char* keys_file_path = NULL;
+       const char* keys_file_name = NULL;
+       bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, 
&keys_file_path);
+       bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, 
&keys_file_name);
 
-               char sub_cert_path[MAX_CERT_PATH_LENGTH];
-               char pub_cert_path[MAX_CERT_PATH_LENGTH];
+       char sub_cert_path[MAX_CERT_PATH_LENGTH];
+       char pub_cert_path[MAX_CERT_PATH_LENGTH];
 
-               //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
-               snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
-               snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
-               free(keys_bundle_dir);
+       //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
+       snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
+       snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
+       free(keys_bundle_dir);
 
-               printf("TS: Loading subscriber key '%s'\n", sub_cert_path);
-               printf("TS: Loading publisher key '%s'\n", pub_cert_path);
-
-               sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, 
(char *) keys_file_name, sub_cert_path);
-               if (sub_cert == NULL){
-                       printf("TS: Cannot load key '%s'\n", sub_cert_path);
-                       printf("TS: Topic '%s' NOT SECURED !\n", topic);
-                       subEP->is_secure = false;
-               }
+       printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", 
sub_cert_path);
+       printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", 
pub_cert_path);
 
-               pub_cert = zcert_load(pub_cert_path);
-               if (sub_cert != NULL && pub_cert == NULL){
-                       zcert_destroy(&sub_cert);
-                       printf("TS: Cannot load key '%s'\n", pub_cert_path);
-                       printf("TS: Topic '%s' NOT SECURED !\n", topic);
-                       subEP->is_secure = false;
-               }
+       zcert_t* sub_cert = get_zcert_from_encoded_file((char *) 
keys_file_path, (char *) keys_file_name, sub_cert_path);
+       if (sub_cert == NULL){
+               printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", 
sub_cert_path);
+               return CELIX_SERVICE_EXCEPTION;
+       }
 
-               pub_key = zcert_public_txt(pub_cert);
+       zcert_t* pub_cert = zcert_load(pub_cert_path);
+       if (pub_cert == NULL){
+               zcert_destroy(&sub_cert);
+               printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", 
pub_cert_path);
+               return CELIX_SERVICE_EXCEPTION;
        }
+
+       const char* pub_key = zcert_public_txt(pub_cert);
 #endif
 
        zsock_t* zmq_s = zsock_new (ZMQ_SUB);
        if(zmq_s==NULL){
-               #ifdef BUILD_WITH_ZMQ_SECURITY
-               if (subEP->is_secure){
-                       zcert_destroy(&sub_cert);
-                       zcert_destroy(&pub_cert);
-               }
-               #endif
+#ifdef BUILD_WITH_ZMQ_SECURITY
+               zcert_destroy(&sub_cert);
+               zcert_destroy(&pub_cert);
+#endif
 
                return CELIX_SERVICE_EXCEPTION;
        }
 
-       #ifdef BUILD_WITH_ZMQ_SECURITY
-       if (subEP->is_secure){
-               zcert_apply (sub_cert, zmq_s);
-               zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of 
publisher to socket of subscriber
-       }
-       #endif
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       zcert_apply (sub_cert, zmq_s);
+       zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to 
socket of subscriber
+#endif
 
        if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){
                zsock_set_subscribe (zmq_s, "");
@@ -214,20 +178,18 @@ celix_status_t 
pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
        ts->zmq_socket = zmq_s;
        ts->running = false;
        ts->nrSubscribers = 0;
-       ts->serializerSvc = NULL;
+       ts->serializer = best_serializer;
 
-       #ifdef BUILD_WITH_ZMQ_SECURITY
-       if (subEP->is_secure){
-               ts->zmq_cert = sub_cert;
-               ts->zmq_pub_cert = pub_cert;
-       }
-       #endif
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       ts->zmq_cert = sub_cert;
+       ts->zmq_pub_cert = pub_cert;
+#endif
 
        celixThreadMutex_create(&ts->socket_lock, NULL);
        celixThreadMutex_create(&ts->ts_lock,NULL);
        arrayList_create(&ts->sub_ep_list);
-       ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
-    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+
        arrayList_create(&ts->pendingConnections);
        arrayList_create(&ts->pendingDisconnections);
        celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -236,17 +198,17 @@ celix_status_t 
pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
        char filter[128];
        memset(filter,0,128);
        
if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT))
 == 0) {
-               // default scope, means that subscriber has not defined a scope 
property
-               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
-                       (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                       PUBSUB_SUBSCRIBER_TOPIC,topic);
+               // default scope, means that subscriber has not defined a scope 
property
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic);
 
        } else {
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic,
-                PUBSUB_SUBSCRIBER_SCOPE,scope);
-    }
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic,
+                               PUBSUB_SUBSCRIBER_SCOPE,scope);
+       }
        service_tracker_customizer_pt customizer = NULL;
        status += 
serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
        status += serviceTracker_createWithFilter(bundle_context, filter, 
customizer, &ts->tracker);
@@ -259,10 +221,7 @@ celix_status_t 
pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 
        sigaction(SIGUSR1,&actions,NULL);
 
-       if (status == CELIX_SUCCESS) {
-               *out=ts;
-               pubsub_topicSubscriptionSetSerializer(ts, serializer);
-       }
+       *out=ts;
 
        return status;
 }
@@ -276,8 +235,8 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        serviceTracker_destroy(ts->tracker);
        arrayList_clear(ts->sub_ep_list);
        arrayList_destroy(ts->sub_ep_list);
-       hashMap_destroy(ts->msgSerializerMapMap,false,false);
-    hashMap_destroy(ts->bundleMap,false,false);
+       /* TODO: Destroy all the serializer maps? */
+       hashMap_destroy(ts->servicesMap,false,false);
 
        celixThreadMutex_lock(&ts->pendingConnections_lock);
        arrayList_destroy(ts->pendingConnections);
@@ -289,18 +248,17 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
        celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
 
-       #ifdef BUILD_WITH_ZMQ_SECURITY
-       zcert_destroy(&(ts->zmq_cert));
-       zcert_destroy(&(ts->zmq_pub_cert));
-       #endif
-
-       celixThreadMutex_unlock(&ts->ts_lock);
-
        celixThreadMutex_lock(&ts->socket_lock);
        zsock_destroy(&(ts->zmq_socket));
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       zcert_destroy(&(ts->zmq_cert));
+       zcert_destroy(&(ts->zmq_pub_cert));
+#endif
        celixThreadMutex_unlock(&ts->socket_lock);
        celixThreadMutex_destroy(&ts->socket_lock);
 
+       celixThreadMutex_unlock(&ts->ts_lock);
+
 
        free(ts);
 
@@ -310,8 +268,6 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
        celix_status_t status = CELIX_SUCCESS;
 
-       //celixThreadMutex_lock(&ts->ts_lock);
-
        status = serviceTracker_open(ts->tracker);
 
        ts->running = true;
@@ -320,16 +276,12 @@ celix_status_t 
pubsub_topicSubscriptionStart(topic_subscription_pt ts){
                
status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts);
        }
 
-       //celixThreadMutex_unlock(&ts->ts_lock);
-
        return status;
 }
 
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
        celix_status_t status = CELIX_SUCCESS;
 
-       //celixThreadMutex_lock(&ts->ts_lock);
-
        ts->running = false;
 
        pthread_kill(ts->recv_thread.thread,SIGUSR1);
@@ -338,15 +290,13 @@ celix_status_t 
pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
        status = serviceTracker_close(ts->tracker);
 
-       //celixThreadMutex_unlock(&ts->ts_lock);
-
        return status;
 }
 
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL){
        celix_status_t status = CELIX_SUCCESS;
        celixThreadMutex_lock(&ts->socket_lock);
-       if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", 
pubURL) != 0){
+       if(!zsock_is(ts->zmq_socket) || 
zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){
                status = CELIX_SERVICE_EXCEPTION;
        }
        celixThreadMutex_unlock(&ts->socket_lock);
@@ -355,28 +305,28 @@ celix_status_t 
pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 }
 
 celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL) {
-    celix_status_t status = CELIX_SUCCESS;
-    char *url = strdup(pubURL);
-    celixThreadMutex_lock(&ts->pendingConnections_lock);
-    arrayList_add(ts->pendingConnections, url);
-    celixThreadMutex_unlock(&ts->pendingConnections_lock);
-    return status;
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_add(ts->pendingConnections, url);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       return status;
 }
 
 celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL) {
-     celix_status_t status = CELIX_SUCCESS;
-    char *url = strdup(pubURL);
-    celixThreadMutex_lock(&ts->pendingDisconnections_lock);
-    arrayList_add(ts->pendingDisconnections, url);
-    celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
-    return status;
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_add(ts->pendingDisconnections, url);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       return status;
 }
 
 celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->socket_lock);
-       if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", 
pubURL) != 0){
+       if(!zsock_is(ts->zmq_socket) || 
zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){
                status = CELIX_SERVICE_EXCEPTION;
        }
        celixThreadMutex_unlock(&ts->socket_lock);
@@ -388,9 +338,7 @@ celix_status_t 
pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, p
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        arrayList_add(ts->sub_ep_list,subEP);
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -401,9 +349,7 @@ celix_status_t 
pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        ts->nrSubscribers++;
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -413,22 +359,17 @@ celix_status_t 
pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        arrayList_removeElement(ts->sub_ep_list,subEP);
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
-
 }
 
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        ts->nrSubscribers--;
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -438,152 +379,114 @@ unsigned int 
pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
        return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-    //clear old
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-
-        }
-    }
-    ts->serializerSvc = serializerSvc;
-    //init new
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
-            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
-        }
-    }
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub){
+       return sub->sub_ep_list;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* svc){
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
        celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-    if (ts->serializerSvc == svc) { //only act if svc removed is services used
-        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-        }
-        ts->serializerSvc = NULL;
-    }
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
+       if (!hashMap_containsKey(ts->servicesMap, service)) {
+               bundle_pt bundle = NULL;
+               hash_map_pt msgTypes = NULL;
 
-static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void* svc) {
-       celix_status_t status = CELIX_SUCCESS;
-       topic_subscription_pt ts = handle;
+               serviceReference_getBundle(reference, &bundle);
 
-       celixThreadMutex_lock(&ts->ts_lock);
-    if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
-        bundle_pt bundle = NULL;
-        serviceReference_getBundle(reference, &bundle);
-
-        if (ts->serializerSvc != NULL) {
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
-            if (map != NULL) {
-                hashMap_put(ts->msgSerializerMapMap, svc, map);
-                hashMap_put(ts->bundleMap, svc, bundle);
-            }
-        }
-    }
+               if(ts->serializer != NULL && bundle!=NULL){
+                       
ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+                       if(msgTypes != NULL){
+                               hashMap_put(ts->servicesMap, service, msgTypes);
+                               printf("PSA_ZMQ_TS: New subscriber 
registered.\n");
+                       }
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot register new subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
        celixThreadMutex_unlock(&ts->ts_lock);
-       printf("TS: New subscriber registered.\n");
-       return status;
 
+       return status;
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void* svc) {
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-    if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
-        pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializerMapMap, svc);
-        if (ts->serializerSvc != NULL){
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
-            hashMap_remove(ts->bundleMap, svc);
-            hashMap_remove(ts->msgSerializerMapMap, svc);
-        }
-    }
+       if (hashMap_containsKey(ts->servicesMap, service)) {
+               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+               if(msgTypes!=NULL && ts->serializer!=NULL){
+                       
ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+                       printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
        celixThreadMutex_unlock(&ts->ts_lock);
 
-       printf("TS: Subscriber unregistered.\n");
        return status;
 }
 
 
-static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
+static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
 
        pubsub_msg_header_pt first_msg_hdr = 
(pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializerMapMap);
-       while (hashMapIterator_hasNext(&iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+       while (hashMapIterator_hasNext(iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
                pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-               pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
 
-               pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, 
(void*)(uintptr_t )first_msg_hdr->type);
+               pubsub_msg_serializer_t *msgSer = 
hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type);
                if (msgSer == NULL) {
-                       printf("TS: Primary message %d not supported. NOT 
sending any part of the whole message.\n", first_msg_hdr->type);
-               } else {
+                       printf("PSA_ZMQ_TS: Primary message %d not supported. 
NOT sending any part of the whole message.\n",first_msg_hdr->type);
+               }
+               else{
                        void *msgInst = NULL;
-                       bool validVersion = checkVersion(msgSer->msgVersion, 
first_msg_hdr);
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,first_msg_hdr);
+
                        if(validVersion){
-                               celix_status_t status = 
msgSer->deserialize(msgSer->handle, (const 
char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 
0, &msgInst);
+
+                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void *) 
zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, 
&msgInst);
 
                                if (status == CELIX_SUCCESS) {
                                        bool release = true;
-
-                                       mp_handle_t* mp_handle = 
create_mp_handle(sub, map, msg_list);
+                                       mp_handle_pt mp_handle = 
create_mp_handle(msgTypes,msg_list);
                                        pubsub_multipart_callbacks_t 
mp_callbacks;
                                        mp_callbacks.handle = mp_handle;
                                        mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
                                        mp_callbacks.getMultipart = 
pubsub_getMultipart;
                                        subsvc->receive(subsvc->handle, 
msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
 
-                                       if (release) {
-                                               msgSer->freeMsg(msgSer->handle, 
msgInst);
+                                       if(release){
+                                               
msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst);
                                        }
-                                       if (mp_handle!=NULL) {
+                                       if(mp_handle!=NULL){
                                                destroy_mp_handle(mp_handle);
                                        }
                                }
                                else{
-                                       printf("TS: Cannot deserialize msgType 
%s.\n", msgSer->msgName);
+                                       printf("PSA_ZMQ_TS: Cannot deserialize 
msgType %s.\n",msgSer->msgName);
                                }
 
-                       } else {
+                       }
+                       else{
                                int major=0,minor=0;
-                               version_getMajor(msgSer->msgVersion, &major);
-                               version_getMinor(msgSer->msgVersion, &minor);
-                               printf("TS: Version mismatch for primary 
message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole 
message.\n",
-                                          msgSer->msgName, major, minor, 
first_msg_hdr->major, first_msg_hdr->minor);
+                               version_getMajor(msgSer->msgVersion,&major);
+                               version_getMinor(msgSer->msgVersion,&minor);
+                               printf("PSA_ZMQ_TS: Version mismatch for 
primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the 
whole message.\n",
+                                               
msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
                        }
+
                }
        }
+       hashMapIterator_destroy(iter);
 
        int i = 0;
        for(;i<arrayList_size(msg_list);i++){
@@ -598,21 +501,21 @@ static void process_msg(topic_subscription_pt sub, 
array_list_pt msg_list) {
 }
 
 static void* zmq_recv_thread_func(void * arg) {
-    topic_subscription_pt sub = (topic_subscription_pt) arg;
+       topic_subscription_pt sub = (topic_subscription_pt) arg;
 
-    while (sub->running) {
+       while (sub->running) {
 
-        celixThreadMutex_lock(&sub->socket_lock);
+               celixThreadMutex_lock(&sub->socket_lock);
 
-        zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
-        if (headerMsg == NULL) {
-            if (errno == EINTR) {
-                //It means we got a signal and we have to exit...
-                printf("TS: header_recv thread for topic got a signal and will 
exit.\n");
-            } else {
-                perror("TS: header_recv thread");
-            }
-        } else {
+               zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
+               if (headerMsg == NULL) {
+                       if (errno == EINTR) {
+                               //It means we got a signal and we have to 
exit...
+                               printf("PSA_ZMQ_TS: header_recv thread for 
topic got a signal and will exit.\n");
+                       } else {
+                               perror("PSA_ZMQ_TS: header_recv thread");
+                       }
+               } else {
 
                        pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) 
zframe_data(headerMsg);
 
@@ -622,9 +525,9 @@ static void* zmq_recv_thread_func(void * arg) {
                                if (payloadMsg == NULL) {
                                        if (errno == EINTR) {
                                                //It means we got a signal and 
we have to exit...
-                                               printf("TS: payload_recv thread 
for topic got a signal and will exit.\n");
+                                               printf("PSA_ZMQ_TS: 
payload_recv thread for topic got a signal and will exit.\n");
                                        } else {
-                                               perror("TS: payload_recv");
+                                               perror("PSA_ZMQ_TS: 
payload_recv");
                                        }
                                        zframe_destroy(&headerMsg);
                                } else {
@@ -644,9 +547,9 @@ static void* zmq_recv_thread_func(void * arg) {
                                                if (h_msg == NULL) {
                                                        if (errno == EINTR) {
                                                                //It means we 
got a signal and we have to exit...
-                                                               printf("TS: 
h_recv thread for topic got a signal and will exit.\n");
+                                                               
printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n");
                                                        } else {
-                                                               perror("TS: 
h_recv");
+                                                               
perror("PSA_ZMQ_TS: h_recv");
                                                        }
                                                        break;
                                                }
@@ -655,9 +558,9 @@ static void* zmq_recv_thread_func(void * arg) {
                                                if (p_msg == NULL) {
                                                        if (errno == EINTR) {
                                                                //It means we 
got a signal and we have to exit...
-                                                               printf("TS: 
p_recv thread for topic got a signal and will exit.\n");
+                                                               
printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n");
                                                        } else {
-                                                               perror("TS: 
p_recv");
+                                                               
perror("PSA_ZMQ_TS: p_recv");
                                                        }
                                                        zframe_destroy(&h_msg);
                                                        break;
@@ -682,16 +585,16 @@ static void* zmq_recv_thread_func(void * arg) {
                        } //zframe_more(headerMsg)
                        else {
                                free(headerMsg);
-                               printf("TS: received message %u for topic %s 
without payload!\n", hdr->type, hdr->topic);
+                               printf("PSA_ZMQ_TS: received message %u for 
topic %s without payload!\n", hdr->type, hdr->topic);
                        }
 
-        } // headerMsg != NULL
-        celixThreadMutex_unlock(&sub->socket_lock);
-        connectPendingPublishers(sub);
-        disconnectPendingPublishers(sub);
-    } // while
+               } // headerMsg != NULL
+               celixThreadMutex_unlock(&sub->socket_lock);
+               connectPendingPublishers(sub);
+               disconnectPendingPublishers(sub);
+       } // while
 
-    return NULL;
+       return NULL;
 }
 
 static void connectPendingPublishers(topic_subscription_pt sub) {
@@ -715,7 +618,7 @@ static void 
disconnectPendingPublishers(topic_subscription_pt sub) {
 }
 
 static void sigusr1_sighandler(int signo){
-       printf("TS: Topic subscription being shut down...\n");
+       printf("PSA_ZMQ_TS: Topic subscription being shut down...\n");
        return;
 }
 
@@ -746,7 +649,7 @@ static int pubsub_getMultipart(void *handle, unsigned int 
msgTypeId, bool retain
                return -1;
        }
 
-       mp_handle_t* mp_handle = handle;
+       mp_handle_pt mp_handle = (mp_handle_pt)handle;
        msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
        if(entry!=NULL){
                entry->retain = retain;
@@ -762,59 +665,66 @@ static int pubsub_getMultipart(void *handle, unsigned int 
msgTypeId, bool retain
 
 }
 
-static mp_handle_t* create_mp_handle(topic_subscription_pt sub, 
pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) {
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt 
rcv_msg_list){
 
        if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart 
message
                return NULL;
        }
 
-       mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle));
-       mp_handle->map = map;
+       mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
+       mp_handle->svc_msg_db = svc_msg_db;
        mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
 
-       int i; //We skip the first message, it will be handle differently
-       for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) {
-               complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i);
+       int i=1; //We skip the first message, it will be handle differently
+       for(;i<arrayList_size(rcv_msg_list);i++){
+               complete_zmq_msg_pt c_msg = 
(complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
                pubsub_msg_header_pt header = 
(pubsub_msg_header_pt)zframe_data(c_msg->header);
 
-               pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, 
(void*)(uintptr_t)(header->type));
-               if (msgSer != NULL) {
+               pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, 
(void*)(uintptr_t)(header->type));
+
+               if (msgSer!= NULL) {
                        void *msgInst = NULL;
-                       bool validVersion = checkVersion(msgSer->msgVersion, 
header);
+
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,header);
+
                        if(validVersion){
-                               //TODO make the getMultipart lazy?
-                               celix_status_t status = 
msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 
0, &msgInst);
+                               celix_status_t status = 
msgSer->deserialize(msgSer->handle, (const void*)zframe_data(c_msg->payload), 
0, &msgInst);
 
                                if(status == CELIX_SUCCESS){
                                        msg_map_entry_pt entry = 
calloc(1,sizeof(struct msg_map_entry));
                                        entry->msgInst = msgInst;
-                                       hashMap_put(mp_handle->rcv_msg_map, 
(void*)(uintptr_t)(header->type), entry);
+                                       hashMap_put(mp_handle->rcv_msg_map, 
(void*)(uintptr_t)header->type,entry);
                                }
                        }
                }
        }
+
        return mp_handle;
+
 }
 
-static void destroy_mp_handle(mp_handle_t* mp_handle){
+static void destroy_mp_handle(mp_handle_pt mp_handle){
 
        hash_map_iterator_pt iter = 
hashMapIterator_create(mp_handle->rcv_msg_map);
        while(hashMapIterator_hasNext(iter)){
                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
                unsigned int msgId = (unsigned 
int)(uintptr_t)hashMapEntry_getKey(entry);
                msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-               pubsub_msg_serializer_t* msgSer = 
hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId);
-               if (msgSer != NULL) {
-                       if (!msgEntry->retain) {
-                               msgSer->freeMsg(msgSer->handle, 
msgEntry->msgInst);
+               pubsub_msg_serializer_t* msgSer = 
hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId);
+
+               if(msgSer!=NULL){
+                       if(!msgEntry->retain){
+                               
msgSer->freeMsg(msgSer->handle,msgEntry->msgInst);
                        }
                }
                else{
-                       printf("TS: ERROR: Cannot find pubsub_message_type for 
msg %u, so cannot destroy it!\n", msgId);
+                       printf("PSA_ZMQ_TS: ERROR: Cannot find 
messageSerializer for msg %u, so cannot destroy it!\n",msgId);
                }
+
+               free(msgEntry);
        }
        hashMapIterator_destroy(iter);
 
-       hashMap_destroy(mp_handle->rcv_msg_map,true,true);
+       hashMap_destroy(mp_handle->rcv_msg_map,false,false);
        free(mp_handle);
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h 
b/pubsub/pubsub_common/public/include/pubsub_admin.h
index f7ab7e0..f24d825 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -31,13 +31,12 @@
 
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
-#include "pubsub_serializer.h"
 
 #define PSA_IP         "PSA_IP"
 #define PSA_ITF        "PSA_INTERFACE"
 #define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
 
-#define PSA_DEFAULT "zmq"
+#define PUBSUB_ADMIN_TYPE_KEY  "pubsub_admin.type"
 
 typedef struct pubsub_admin *pubsub_admin_pt;
 
@@ -53,12 +52,19 @@ struct pubsub_admin_service {
        celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* 
scope, char* topic);
        celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* 
scope, char* topic);
 
-       celix_status_t (*matchPublisher)(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score);
-       celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score);
-
-       celix_status_t (*setSerializer)(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
-       celix_status_t (*removeSerializer)(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
-
+       /* Match principle:
+        * - A full matching pubsub_admin gives 200 points
+        * - A full matching serializer gives 100 points
+        * - If QoS = sample
+        *              - fallback pubsub_admin order of selection is: udp_mc, 
zmq. Points allocation is 100,75.
+        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
+        * - If QoS = control
+        *              - fallback pubsub_admin order of selection is: 
zmq,udp_mc. Points allocation is 100,75.
+        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
+        * - If nothing is specified, QoS = sample is assumed, so the same 
score applies, just divided by two.
+        *
+        */
+       celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
 };
 
 typedef struct pubsub_admin_service *pubsub_admin_service_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin_match.h 
b/pubsub/pubsub_common/public/include/pubsub_admin_match.h
new file mode 100644
index 0000000..a366c34
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_admin_match.h
@@ -0,0 +1,27 @@
+/*
+ * pubsub_admin_match.h
+ *
+ *  Created on: Sep 4, 2017
+ *      Author: dn234
+ */
+
+#ifndef PUBSUB_ADMIN_MATCH_H_
+#define PUBSUB_ADMIN_MATCH_H_
+
+#include "celix_errno.h"
+#include "properties.h"
+#include "array_list.h"
+
+#include "pubsub_serializer.h"
+
+#define QOS_ATTRIBUTE_KEY      "attribute.qos"
+#define QOS_TYPE_SAMPLE                "sample"        /* A.k.a. unreliable 
connection */
+#define QOS_TYPE_CONTROL       "control"       /* A.k.a. reliable connection */
+
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE  200.0F
+#define SERIALIZER_FULL_MATCH_SCORE            100.0F
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
+
+#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h 
b/pubsub/pubsub_common/public/include/pubsub_common.h
index 46abd72..5dfd8fd 100644
--- a/pubsub/pubsub_common/public/include/pubsub_common.h
+++ b/pubsub/pubsub_common/public/include/pubsub_common.h
@@ -32,12 +32,12 @@
 #define PUBSUB_DISCOVERY_SERVICE               "pubsub_discovery"
 #define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
 
-#define PUBSUB_ANY_SUB_TOPIC                   "any"
+#define PUBSUB_ANY_SUB_TOPIC                   "any"
 
-#define PUBSUB_BUNDLE_ID                               "bundle.id"
+#define        PUBSUB_BUNDLE_ID                        "bundle.id"
 
-#define MAX_SCOPE_LEN                                  1024
-#define MAX_TOPIC_LEN                                  1024
+#define MAX_SCOPE_LEN                           1024
+#define MAX_TOPIC_LEN                          1024
 
 struct pubsub_msg_header{
        char topic[MAX_TOPIC_LEN];

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h 
b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
index 193b3fd..8a979eb 100644
--- a/pubsub/pubsub_common/public/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
@@ -28,6 +28,11 @@
 #define PUBSUB_ENDPOINT_H_
 
 #include "service_reference.h"
+#include "listener_hook_service.h"
+#include "properties.h"
+
+#include "publisher.h"
+#include "subscriber.h"
 
 struct pubsub_endpoint {
     char *frameworkUUID;
@@ -36,12 +41,15 @@ struct pubsub_endpoint {
     long serviceID;
     char* endpoint;
     bool is_secure;
+    properties_pt topic_props;
 };
 
 typedef struct pubsub_endpoint *pubsub_endpoint_pt;
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,pubsub_endpoint_pt* 
psEp);
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h 
b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index e9f9f6c..4489fa4 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -28,9 +28,12 @@
 #define PUBSUB_SERIALIZER_SERVICE_H_
 
 #include "service_reference.h"
+#include "hash_map.h"
 
 #include "pubsub_common.h"
 
+#define PUBSUB_SERIALIZER_TYPE_KEY     "pubsub_serializer.type"
+
 /**
  * There should be a pubsub_serializer_t
  * per msg type (msg id) per bundle
@@ -39,28 +42,24 @@
  * a serializer_map per bundle. Potentially using
  * the extender pattern.
  */
+
 typedef struct pubsub_msg_serializer {
-    void* handle;
-    unsigned int msgId;
-    const char* msgName;
-    version_pt msgVersion;
+       void* handle;
+       unsigned int msgId;
+       const char* msgName;
+       version_pt msgVersion;
 
-    celix_status_t (*serialize)(void* handle, const void* input, char** out, 
size_t* outLen);
-    celix_status_t (*deserialize)(void* handle, const char* input, size_t 
inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+       celix_status_t (*serialize)(void* handle, const void* input, void** 
out, size_t* outLen);
+       celix_status_t (*deserialize)(void* handle, const void* input, size_t 
inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+       void (*freeMsg)(void* handle, void* msg);
 
-    void (*freeMsg)(void* handle, void* msg);
 } pubsub_msg_serializer_t;
 
-typedef struct pubsub_msg_serializer_map {
-    bundle_pt bundle;
-    hash_map_pt serializers; //key = msg id (unsigned int), value = 
pubsub_serializer_t*
-} pubsub_msg_serializer_map_t;
-
 typedef struct pubsub_serializer_service {
        void* handle;
 
-       celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, 
pubsub_msg_serializer_map_t** out);
-    celix_status_t (*destroySerializerMap)(void* handle, 
pubsub_msg_serializer_map_t* map);
+       celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, 
hash_map_pt* serializerMap);
+       celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt 
serializerMap);
 
 } pubsub_serializer_service_t;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/log_helper.c 
b/pubsub/pubsub_common/public/src/log_helper.c
index dbd1cc3..7a63363 100644
--- a/pubsub/pubsub_common/public/src/log_helper.c
+++ b/pubsub/pubsub_common/public/src/log_helper.c
@@ -149,6 +149,9 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
         return status;
 }
 
+
+
+
 celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* 
message, ... )
 {
     celix_status_t status = CELIX_SUCCESS;
@@ -166,6 +169,7 @@ celix_status_t logHelper_log(log_helper_pt loghelper, 
log_level_t level, char* m
                int i = 0;
 
                for (; i < arrayList_size(loghelper->logServices); i++) {
+
                        log_service_pt logService = 
arrayList_get(loghelper->logServices, i);
 
                        if (logService != NULL) {
@@ -175,31 +179,31 @@ celix_status_t logHelper_log(log_helper_pt loghelper, 
log_level_t level, char* m
                }
 
                pthread_mutex_unlock(&loghelper->logListLock);
+       }
 
-               if (!logged && loghelper->stdOutFallback) {
-                       char *levelStr = NULL;
-
-                       switch (level) {
-                               case OSGI_LOGSERVICE_ERROR:
-                                       levelStr = "ERROR";
-                                       break;
-                               case OSGI_LOGSERVICE_WARNING:
-                                       levelStr = "WARNING";
-                                       break;
-                               case OSGI_LOGSERVICE_INFO:
-                                       levelStr = "INFO";
-                                       break;
-                               case OSGI_LOGSERVICE_DEBUG:
-                               default:
-                                       levelStr = "DEBUG";
-                                       break;
-                       }
 
-                       printf("%s: %s\n", levelStr, msg);
-               }
-       }
+    if (!logged && loghelper->stdOutFallback) {
+        char *levelStr = NULL;
+
+        switch (level) {
+            case OSGI_LOGSERVICE_ERROR:
+                levelStr = "ERROR";
+                break;
+            case OSGI_LOGSERVICE_WARNING:
+                levelStr = "WARNING";
+                break;
+            case OSGI_LOGSERVICE_INFO:
+                levelStr = "INFO";
+                break;
+            case OSGI_LOGSERVICE_DEBUG:
+            default:
+                levelStr = "DEBUG";
+                break;
+        }
+
+        printf("%s: %s\n", levelStr, msg);
+    }
 
-       va_end(listPointer);
 
        return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_admin_match.c 
b/pubsub/pubsub_common/public/src/pubsub_admin_match.c
new file mode 100644
index 0000000..bb555b7
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/pubsub_admin_match.c
@@ -0,0 +1,303 @@
+/*
+ * pubsub_admin_match.c
+
+ *
+ *  Created on: Sep 4, 2017
+ *      Author: dn234
+ */
+
+#include <string.h>
+#include "service_reference.h"
+
+#include "pubsub_admin.h"
+
+#include "pubsub_admin_match.h"
+
+#define KNOWN_PUBSUB_ADMIN_NUM 2
+#define KNOWN_SERIALIZER_NUM   2
+
+static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"udp_mc","zmq"};
+static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
+
+static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"zmq","udp_mc"};
+static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
+
+static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
+static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
+
+static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType);
+static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService);
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score){
+
+       celix_status_t status = CELIX_SUCCESS;
+       double final_score = 0;
+       int i = 0, j = 0;
+
+       const char *requested_admin_type                = NULL;
+       const char *requested_serializer_type   = NULL;
+       const char *requested_qos_type                  = NULL;
+
+       if(endpoint_props!=NULL){
+               requested_admin_type            = 
properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
+               requested_serializer_type       = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+               requested_qos_type                      = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+       }
+
+       /* Analyze the pubsub_admin */
+       if(requested_admin_type != NULL){ /* We got precise specification on 
the pubsub_admin we want */
+               
if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){
 //Full match
+                       final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
+               }
+       }
+       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected PSA */
+               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+                               
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+                                       final_score += 
qos_pubsub_admin_score[i];
+                                       break;
+                               }
+                       }
+               }
+               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+                               
if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+                                       final_score += 
qos_pubsub_admin_score[i];
+                                       break;
+                               }
+                       }
+               }
+               else{
+                       printf("Unknown QoS type '%s'\n",requested_qos_type);
+                       status = CELIX_ILLEGAL_ARGUMENT;
+               }
+       }
+       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
+               for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+                       
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+                               final_score += (qos_pubsub_admin_score[i]/2);
+                               break;
+                       }
+               }
+       }
+
+       char *serializer_type = NULL;
+       /* Analyze the serializers */
+       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
+               for(i=0;i<arrayList_size(serializerList);i++){
+                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
+                       get_serializer_type(svcRef, &serializer_type);
+                       if(serializer_type != NULL){
+                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+                                       final_score += 
SERIALIZER_FULL_MATCH_SCORE;
+                                       break;
+                               }
+                       }
+               }
+       }
+       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
+               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       ser_found = true;
+                                               }
+                                       }
+                               }
+                               if(ser_found){
+                                       final_score += qos_serializer_score[i];
+                               }
+                       }
+               }
+               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       ser_found = true;
+                                               }
+                                       }
+                               }
+                               if(ser_found){
+                                       final_score += qos_serializer_score[i];
+                               }
+                       }
+               }
+               else{
+                       printf("Unknown QoS type '%s'\n",requested_qos_type);
+                       status = CELIX_ILLEGAL_ARGUMENT;
+               }
+       }
+       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
+               bool ser_found = false;
+               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                               service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                               get_serializer_type(svcRef, &serializer_type);
+                               if(serializer_type != NULL){
+                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                               ser_found = true;
+                                       }
+                               }
+                       }
+                       if(ser_found){
+                               final_score += (qos_serializer_score[i]/2);
+                       }
+               }
+       }
+
+       *score = final_score;
+
+       printf("Score for pair <%s,%s> = 
%f\n",pubsub_admin_type,serializer_type,final_score);
+
+       return status;
+}
+
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
+       celix_status_t status = CELIX_SUCCESS;
+
+       int i = 0, j = 0;
+
+       const char *requested_serializer_type   = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+       const char *requested_qos_type                  = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+
+       service_reference_pt svcRef = NULL;
+       void *svc = NULL;
+
+       /* Analyze the serializers */
+       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
+               for(i=0;i<arrayList_size(serializerList);i++){
+                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
+                       char *serializer_type = NULL;
+                       get_serializer_type(svcRef, &serializer_type);
+                       if(serializer_type != NULL){
+                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+                                       manage_service_from_reference(svcRef, 
&svc,true);
+                                       if(svc==NULL){
+                                               printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                               status = 
CELIX_SERVICE_EXCEPTION;
+                                       }
+                                       *serSvc = svc;
+                                       break;
+                               }
+                       }
+               }
+       }
+       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
+               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       char *serializer_type = NULL;
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       
manage_service_from_reference(svcRef, &svc,true);
+                                                       if(svc==NULL){
+                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                                               status = 
CELIX_SERVICE_EXCEPTION;
+                                                       }
+                                                       else{
+                                                               *serSvc = svc;
+                                                               ser_found = 
true;
+                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       char *serializer_type = NULL;
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       
manage_service_from_reference(svcRef, &svc,true);
+                                                       if(svc==NULL){
+                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                                               status = 
CELIX_SERVICE_EXCEPTION;
+                                                       }
+                                                       else{
+                                                               *serSvc = svc;
+                                                               ser_found = 
true;
+                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+               else{
+                       printf("Unknown QoS type '%s'\n",requested_qos_type);
+                       status = CELIX_ILLEGAL_ARGUMENT;
+               }
+       }
+       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
+               bool ser_found = false;
+               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                               svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                               char *serializer_type = NULL;
+                               get_serializer_type(svcRef, &serializer_type);
+                               if(serializer_type != NULL){
+                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                               
manage_service_from_reference(svcRef, &svc,true);
+                                               if(svc==NULL){
+                                                       printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                                       status = 
CELIX_SERVICE_EXCEPTION;
+                                               }
+                                               else{
+                                                       *serSvc = svc;
+                                                       ser_found = true;
+                                                       printf("Selected %s 
serializer as best without any 
specification\n",qos_sample_serializer_prio_list[i]);
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       if(svc!=NULL && svcRef!=NULL){
+               manage_service_from_reference(svcRef, svc, false);
+       }
+
+       return status;
+}
+
+static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType){
+
+       const char *serType = NULL;
+       serviceReference_getProperty(svcRef, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType != NULL){
+               *serializerType = (char*)serType;
+       }
+       else{
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",svcRef);
+               *serializerType = NULL;
+       }
+}
+
+static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService){
+       bundle_context_pt context = NULL;
+       bundle_pt bundle = NULL;
+       serviceReference_getBundle(svcRef, &bundle);
+       bundle_getContext(bundle, &context);
+       if(getService){
+               bundleContext_getService(context, svcRef, svc);
+       }
+       else{
+               bundleContext_ungetService(context, svcRef, NULL);
+       }
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c 
b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
index ebb330e..f6776d5 100644
--- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -33,41 +33,101 @@
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
 #include "constants.h"
-#include "subscriber.h"
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* 
out) {
-    celix_status_t status = CELIX_SUCCESS;
+#include "pubsub_utils.h"
 
-    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
 
-    if (fwUUID != NULL) {
-        psEp->frameworkUUID = strdup(fwUUID);
-    }
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps);
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher);
 
-    if (scope != NULL) {
-        psEp->scope = strdup(scope);
-    }
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps){
 
-    if (topic != NULL) {
-        psEp->topic = strdup(topic);
-    }
+       if (fwUUID != NULL) {
+               psEp->frameworkUUID = strdup(fwUUID);
+       }
+
+       if (scope != NULL) {
+               psEp->scope = strdup(scope);
+       }
+
+       if (topic != NULL) {
+               psEp->topic = strdup(topic);
+       }
+
+       psEp->serviceID = serviceId;
+
+       if(endpoint != NULL) {
+               psEp->endpoint = strdup(endpoint);
+       }
+
+       if(topic_props != NULL){
+               if(cloneProps){
+                       properties_copy(topic_props, &(psEp->topic_props));
+               }
+               else{
+                       psEp->topic_props = topic_props;
+               }
+       }
+}
+
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher){
+
+       properties_pt topic_props = NULL;
+
+       bool isSystemBundle = false;
+       bundle_isSystemBundle(bundle, &isSystemBundle);
+       long bundleId = -1;
+       bundle_isSystemBundle(bundle, &isSystemBundle);
+       bundle_getBundleId(bundle,&bundleId);
+
+       if(isSystemBundle == false) {
 
-    psEp->serviceID = serviceId;
+               char *bundleRoot = NULL;
+               char* topicPropertiesPath = NULL;
+               bundle_getEntry(bundle, ".", &bundleRoot);
 
-    if (endpoint != NULL) {
-        psEp->endpoint = strdup(endpoint);
-    }
+               if(bundleRoot != NULL){
 
-       *out = psEp;
+                       asprintf(&topicPropertiesPath, 
"%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", 
topic);
+                       topic_props = properties_load(topicPropertiesPath);
+                       if(topic_props==NULL){
+                               printf("PSEP: Could not load properties for %s 
on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", 
topic,bundleId);
+                       }
 
-    return status;
+                       free(topicPropertiesPath);
+                       free(bundleRoot);
+               }
+       }
+
+       return topic_props;
+}
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp){
+       celix_status_t status = CELIX_SUCCESS;
+
+       *psEp = calloc(1, sizeof(**psEp));
+
+       pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, 
endpoint, topic_props, true);
+
+       return status;
 
 }
 
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference, pubsub_endpoint_pt* out){
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out){
        celix_status_t status = CELIX_SUCCESS;
 
-       pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp));
+       *out = calloc(1,sizeof(**out));
+
+       pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, 
in->serviceID, in->endpoint, in->topic_props, true);
+
+       return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference, pubsub_endpoint_pt* psEp, bool isPublisher){
+       celix_status_t status = CELIX_SUCCESS;
+
+       *psEp = calloc(1,sizeof(**psEp));
 
        bundle_pt bundle = NULL;
        bundle_context_pt ctxt = NULL;
@@ -85,49 +145,86 @@ celix_status_t 
pubsubEndpoint_createFromServiceReference(service_reference_pt re
        const char* serviceId = NULL;
        
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
 
-       if(fwUUID!=NULL){
-               psEp->frameworkUUID = strdup(fwUUID);
-       }
+       /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
+       properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
 
-       if(scope!=NULL){
-               psEp->scope = strdup(scope);
-       } else {
-           psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
-       }
+       pubsubEndpoint_setFields(*psEp, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, 
strtol(serviceId,NULL,10), NULL, topic_props, false);
 
-       if(topic!=NULL){
-               psEp->topic = strdup(topic);
+       if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope 
|| !(*psEp)->topic) {
+               fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
+               status = CELIX_BUNDLE_EXCEPTION;
        }
 
-       if(serviceId!=NULL){
-               psEp->serviceID = strtol(serviceId,NULL,10);
-       }
+       return status;
 
-       if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || 
!psEp->topic) {
-               fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
-               status = CELIX_BUNDLE_EXCEPTION;
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher){
+       celix_status_t status = CELIX_SUCCESS;
+
+       const char* fwUUID=NULL;
+       
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+       char* topic = pubsub_getTopicFromFilter(info->filter);
+       if(topic==NULL || fwUUID==NULL){
+               return CELIX_BUNDLE_EXCEPTION;
        }
 
-       if (status != CELIX_SUCCESS) {
-               pubsubEndpoint_destroy(psEp);
-       } else {
-               *out = psEp;
+       *psEp = calloc(1, sizeof(**psEp));
+
+       char* scope = pubsub_getScopeFromFilter(info->filter);
+       if(scope == NULL) {
+               scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
        }
 
-       return status;
+       bundle_pt bundle = NULL;
+       long bundleId = -1;
+       bundleContext_getBundle(info->context,&bundle);
 
+       bundle_getBundleId(bundle,&bundleId);
+
+       properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
+
+       /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
+       pubsubEndpoint_setFields(*psEp, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, 
topic_props, false);
+
+       free(topic);
+       free(scope);
+
+
+       return status;
 }
 
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
-       if (psEp != NULL) {
+
+       if(psEp->frameworkUUID!=NULL){
                free(psEp->frameworkUUID);
+               psEp->frameworkUUID = NULL;
+       }
+
+       if(psEp->scope!=NULL){
                free(psEp->scope);
+               psEp->scope = NULL;
+       }
+
+       if(psEp->topic!=NULL){
                free(psEp->topic);
+               psEp->topic = NULL;
+       }
+
+       if(psEp->endpoint!=NULL){
                free(psEp->endpoint);
+               psEp->endpoint = NULL;
        }
+
+       if(psEp->topic_props != NULL){
+               properties_destroy(psEp->topic_props);
+       }
+
        free(psEp);
 
        return CELIX_SUCCESS;
+
 }
 
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
@@ -138,7 +235,6 @@ bool pubsubEndpoint_equals(pubsub_endpoint_pt 
psEp1,pubsub_endpoint_pt psEp2){
                        (psEp1->serviceID == psEp2->serviceID) /*&&
                        ((psEp1->endpoint==NULL && 
psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
        );
-
 }
 
 char *createScopeTopicKey(const char* scope, const char* topic) {

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h 
b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
index d5be8d6..676a6ab 100644
--- a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
+++ b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
@@ -58,7 +58,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt 
node_discovery);
 celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, 
pubsub_endpoint_pt pubEP);
 celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, 
pubsub_endpoint_pt pubEP);
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, 
service_reference_pt reference, void **service);
 celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, 
service_reference_pt reference, void * service);
 celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, 
service_reference_pt reference, void * service);
 celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, 
service_reference_pt reference, void * service);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_common.c 
b/pubsub/pubsub_discovery/private/src/etcd_common.c
index a53a844..c757801 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_common.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_common.c
@@ -32,6 +32,7 @@
 #include "pubsub_discovery.h"
 #include "pubsub_discovery_impl.h"
 
+
 #define MAX_ROOTNODE_LENGTH            128
 #define MAX_LOCALNODE_LENGTH   4096
 #define MAX_FIELD_LENGTH               128

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c 
b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
index 13ba3aa..3c3a5a8 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -57,17 +57,17 @@ struct etcd_watcher {
        celix_thread_mutex_t watcherLock;
        celix_thread_t watcherThread;
 
-    char *scope;
+       char *scope;
        char *topic;
        volatile bool running;
 };
 
 struct etcd_writer {
-    pubsub_discovery_pt pubsub_discovery;
-    celix_thread_mutex_t localPubsLock;
-    array_list_pt localPubs;
-    volatile bool running;
-    celix_thread_t writerThread;
+       pubsub_discovery_pt pubsub_discovery;
+       celix_thread_mutex_t localPubsLock;
+       array_list_pt localPubs;
+       volatile bool running;
+       celix_thread_t writerThread;
 };
 
 
@@ -77,41 +77,41 @@ static celix_status_t 
etcdWatcher_getTopicRootPath(bundle_context_pt context, co
        const char* rootPath = NULL;
 
        if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-           snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, 
scope, topic);
+               snprintf(rootNode, rootNodeLen, "%s/%s/%s", 
DEFAULT_ETCD_ROOTPATH, scope, topic);
        } else {
-        snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
+               snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, 
topic);
        }
 
        return status;
 }
 
 static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* 
rootNode) {
-    celix_status_t status = CELIX_SUCCESS;
-    const char* rootPath = NULL;
+       celix_status_t status = CELIX_SUCCESS;
+       const char* rootPath = NULL;
 
-    if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) 
!= CELIX_SUCCESS) || (!rootPath)) {
-        strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
-    } else {
-        strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
-    }
+       if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+               strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
+       } else {
+               strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
+       }
 
-    return status;
+       return status;
 }
 
 
 static void add_node(const char *key, const char *value, void* arg) {
-    pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
-    pubsub_endpoint_pt pubEP = NULL;
-    celix_status_t status = 
etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
-    if(!status && pubEP) {
-        pubsub_discovery_addNode(ps_discovery, pubEP);
-    }
+       pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
+       pubsub_endpoint_pt pubEP = NULL;
+       celix_status_t status = 
etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
+       if(!status && pubEP) {
+               pubsub_discovery_addNode(ps_discovery, pubEP);
+       }
 }
 
 static celix_status_t 
etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, 
const char *rootPath, long long * highestModified) {
        celix_status_t status = CELIX_SUCCESS;
        if(etcd_get_directory(rootPath, add_node, ps_discovery, 
highestModified)) {
-           status = CELIX_ILLEGAL_ARGUMENT;
+               status = CELIX_ILLEGAL_ARGUMENT;
        }
        return status;
 }
@@ -137,14 +137,14 @@ celix_status_t 
etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
 
        asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
        if(expr) {
-            int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
serviceId);
-            free(expr);
-            if (foundItems != 4) { // Could happen when a directory is 
removed, just don't process this.
-                    status = CELIX_ILLEGAL_STATE;
-            }
-            else{
-                    status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,pubEP);
-            }
+               int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
serviceId);
+               free(expr);
+               if (foundItems != 4) { // Could happen when a directory is 
removed, just don't process this.
+                       status = CELIX_ILLEGAL_STATE;
+               }
+               else{
+                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
+               }
        }
        return status;
 }
@@ -154,75 +154,75 @@ celix_status_t 
etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
  * changing discovery endpoint information within etcd.
  */
 static void* etcdWatcher_run(void* data) {
-    etcd_watcher_pt watcher = (etcd_watcher_pt) data;
-    time_t timeBeforeWatch = time(NULL);
-    char rootPath[MAX_ROOTNODE_LENGTH];
-    long long highestModified = 0;
-
-    pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
-    bundle_context_pt context = ps_discovery->context;
-
-    memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
-
-    //TODO: add topic to etcd key
-    etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, 
rootPath, MAX_ROOTNODE_LENGTH);
-    etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, 
&highestModified);
-
-    while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && 
watcher->running) {
-
-        char *rkey = NULL;
-        char *value = NULL;
-        char *preValue = NULL;
-        char *action = NULL;
-        long long modIndex;
-
-        celixThreadMutex_unlock(&watcher->watcherLock);
-
-        if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, 
&value, &rkey, &modIndex) == 0 && action != NULL) {
-            pubsub_endpoint_pt pubEP = NULL;
-            if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 
0)) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, 
rkey, value, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_addNode(ps_discovery, pubEP);
-                }
-            } else if (strcmp(action, "delete") == 0) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, 
rkey, preValue, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_removeNode(ps_discovery, pubEP);
-                }
-            } else if (strcmp(action, "expire") == 0) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, 
rkey, preValue, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_removeNode(ps_discovery, pubEP);
-                }
-            } else if (strcmp(action, "update") == 0) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, 
rkey, value, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_addNode(ps_discovery, pubEP);
-                }
-            } else {
-                fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: 
%s", action);
-            }
-            highestModified = modIndex;
-        } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
-            sleep(DEFAULT_ETCD_TTL / 4);
-        }
-
-        FREE_MEM(action);
-        FREE_MEM(value);
-        FREE_MEM(preValue);
-        FREE_MEM(rkey);
-
-        /* prevent busy waiting, in case etcd_watch returns false */
-
-
-        if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
-            timeBeforeWatch = time(NULL);
-        }
-
-    }
-
-    if (watcher->running == false) {
-        celixThreadMutex_unlock(&watcher->watcherLock);
-    }
-
-    return NULL;
+       etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+       time_t timeBeforeWatch = time(NULL);
+       char rootPath[MAX_ROOTNODE_LENGTH];
+       long long highestModified = 0;
+
+       pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
+       bundle_context_pt context = ps_discovery->context;
+
+       memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
+
+       //TODO: add topic to etcd key
+       etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, 
rootPath, MAX_ROOTNODE_LENGTH);
+       etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, 
&highestModified);
+
+       while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) 
&& watcher->running) {
+
+               char *rkey = NULL;
+               char *value = NULL;
+               char *preValue = NULL;
+               char *action = NULL;
+               long long modIndex;
+
+               celixThreadMutex_unlock(&watcher->watcherLock);
+
+               if (etcd_watch(rootPath, highestModified + 1, &action, 
&preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+                       pubsub_endpoint_pt pubEP = NULL;
+                       if ((strcmp(action, "set") == 0) || (strcmp(action, 
"create") == 0)) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
+                                       pubsub_discovery_addNode(ps_discovery, 
pubEP);
+                               }
+                       } else if (strcmp(action, "delete") == 0) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
+                                       
pubsub_discovery_removeNode(ps_discovery, pubEP);
+                               }
+                       } else if (strcmp(action, "expire") == 0) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
+                                       
pubsub_discovery_removeNode(ps_discovery, pubEP);
+                               }
+                       } else if (strcmp(action, "update") == 0) {
+                               if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
+                                       pubsub_discovery_addNode(ps_discovery, 
pubEP);
+                               }
+                       } else {
+                               fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, 
"Unexpected action: %s", action);
+                       }
+                       highestModified = modIndex;
+               } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 
4)) {
+                       sleep(DEFAULT_ETCD_TTL / 4);
+               }
+
+               FREE_MEM(action);
+               FREE_MEM(value);
+               FREE_MEM(preValue);
+               FREE_MEM(rkey);
+
+               /* prevent busy waiting, in case etcd_watch returns false */
+
+
+               if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
+                       timeBeforeWatch = time(NULL);
+               }
+
+       }
+
+       if (watcher->running == false) {
+               celixThreadMutex_unlock(&watcher->watcherLock);
+       }
+
+       return NULL;
 }
 
 celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, 
bundle_context_pt context, const char *scope, const char *topic, 
etcd_watcher_pt *watcher) {
@@ -243,16 +243,18 @@ celix_status_t etcdWatcher_create(pubsub_discovery_pt 
pubsub_discovery, bundle_c
        (*watcher)->scope = strdup(scope);
        (*watcher)->topic = strdup(topic);
 
-    celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
 
-    celixThreadMutex_lock(&(*watcher)->watcherLock);
+       celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
+
+       celixThreadMutex_lock(&(*watcher)->watcherLock);
+
+       status = celixThread_create(&(*watcher)->watcherThread, NULL, 
etcdWatcher_run, *watcher);
+       if (status == CELIX_SUCCESS) {
+               (*watcher)->running = true;
+       }
 
-    status = celixThread_create(&(*watcher)->watcherThread, NULL, 
etcdWatcher_run, *watcher);
-    if (status == CELIX_SUCCESS) {
-       (*watcher)->running = true;
-    }
+       celixThreadMutex_unlock(&(*watcher)->watcherLock);
 
-    celixThreadMutex_unlock(&(*watcher)->watcherLock);
 
        return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c 
b/pubsub/pubsub_discovery/private/src/etcd_writer.c
index 687d802..1c423f3 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_writer.c
@@ -47,11 +47,11 @@
 #define DEFAULT_ETCD_TTL 30
 
 struct etcd_writer {
-    pubsub_discovery_pt pubsub_discovery;
-    celix_thread_mutex_t localPubsLock;
-    array_list_pt localPubs;
-    volatile bool running;
-    celix_thread_t writerThread;
+       pubsub_discovery_pt pubsub_discovery;
+       celix_thread_mutex_t localPubsLock;
+       array_list_pt localPubs;
+       volatile bool running;
+       celix_thread_t writerThread;
 };
 
 
@@ -60,38 +60,38 @@ static void* etcdWriter_run(void* data);
 
 
 etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
-    etcd_writer_pt writer = calloc(1, sizeof(*writer));
-    if(writer) {
-        celixThreadMutex_create(&writer->localPubsLock, NULL);
-        arrayList_create(&writer->localPubs);
-        writer->pubsub_discovery = disc;
-        writer->running = true;
-        celixThread_create(&writer->writerThread, NULL, etcdWriter_run, 
writer);
-    }
-    return writer;
+       etcd_writer_pt writer = calloc(1, sizeof(*writer));
+       if(writer) {
+               celixThreadMutex_create(&writer->localPubsLock, NULL);
+               arrayList_create(&writer->localPubs);
+               writer->pubsub_discovery = disc;
+               writer->running = true;
+               celixThread_create(&writer->writerThread, NULL, etcdWriter_run, 
writer);
+       }
+       return writer;
 }
 
 void etcdWriter_destroy(etcd_writer_pt writer) {
-    char dir[MAX_ROOTNODE_LENGTH];
-    const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-    writer->running = false;
-    celixThread_join(writer->writerThread, NULL);
-
-    celixThreadMutex_lock(&writer->localPubsLock);
-    for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
-        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
-        memset(dir,0,MAX_ROOTNODE_LENGTH);
-        
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
-        etcd_del(dir);
-        pubsubEndpoint_destroy(pubEP);
-    }
-    arrayList_destroy(writer->localPubs);
-
-    celixThreadMutex_unlock(&writer->localPubsLock);
-    celixThreadMutex_destroy(&(writer->localPubsLock));
-
-    free(writer);
+       char dir[MAX_ROOTNODE_LENGTH];
+       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+       writer->running = false;
+       celixThread_join(writer->writerThread, NULL);
+
+       celixThreadMutex_lock(&writer->localPubsLock);
+       for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
+               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
+               memset(dir,0,MAX_ROOTNODE_LENGTH);
+               
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
+               etcd_del(dir);
+               pubsubEndpoint_destroy(pubEP);
+       }
+       arrayList_destroy(writer->localPubs);
+
+       celixThreadMutex_unlock(&writer->localPubsLock);
+       celixThreadMutex_destroy(&(writer->localPubsLock));
+
+       free(writer);
 }
 
 celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP, bool storeEP){
@@ -101,11 +101,11 @@ celix_status_t 
etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
                const char *fwUUID = NULL;
                bundleContext_getProperty(writer->pubsub_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
                if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
-                   celixThreadMutex_lock(&writer->localPubsLock);
-                   pubsub_endpoint_pt p = NULL;
-                   
pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
-                   arrayList_add(writer->localPubs,p);
-                   celixThreadMutex_unlock(&writer->localPubsLock);
+                       celixThreadMutex_lock(&writer->localPubsLock);
+                       pubsub_endpoint_pt p = NULL;
+                       pubsubEndpoint_clone(pubEP, &p);
+                       arrayList_add(writer->localPubs,p);
+                       celixThreadMutex_unlock(&writer->localPubsLock);
                }
        }
 
@@ -138,52 +138,52 @@ celix_status_t 
etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
 }
 
 celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-    char *key = NULL;
-
-    const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-    asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, 
pubEP->frameworkUUID, pubEP->serviceID);
-
-    celixThreadMutex_lock(&writer->localPubsLock);
-    for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
-        pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
-        if (pubsubEndpoint_equals(ep, pubEP)) {
-            arrayList_remove(writer->localPubs, i);
-            pubsubEndpoint_destroy(ep);
-            break;
-        }
-    }
-    celixThreadMutex_unlock(&writer->localPubsLock);
-
-    if (etcd_del(key)) {
-        printf("Failed to remove key %s from ETCD\n",key);
-        status = CELIX_ILLEGAL_ARGUMENT;
-    }
-    FREE_MEM(key);
-    return status;
+       celix_status_t status = CELIX_SUCCESS;
+       char *key = NULL;
+
+       const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+       asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, 
pubEP->frameworkUUID, pubEP->serviceID);
+
+       celixThreadMutex_lock(&writer->localPubsLock);
+       for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
+               pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
+               if (pubsubEndpoint_equals(ep, pubEP)) {
+                       arrayList_remove(writer->localPubs, i);
+                       pubsubEndpoint_destroy(ep);
+                       break;
+               }
+       }
+       celixThreadMutex_unlock(&writer->localPubsLock);
+
+       if (etcd_del(key)) {
+               printf("Failed to remove key %s from ETCD\n",key);
+               status = CELIX_ILLEGAL_ARGUMENT;
+       }
+       FREE_MEM(key);
+       return status;
 }
 
 static void* etcdWriter_run(void* data) {
-    etcd_writer_pt writer = (etcd_writer_pt)data;
-    while(writer->running) {
-          celixThreadMutex_lock(&writer->localPubsLock);
-          for(int i=0; i < arrayList_size(writer->localPubs); i++) {
-              
etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
-          }
-          celixThreadMutex_unlock(&writer->localPubsLock);
-          sleep(DEFAULT_ETCD_TTL / 2);
-    }
-
-    return NULL;
+       etcd_writer_pt writer = (etcd_writer_pt)data;
+       while(writer->running) {
+               celixThreadMutex_lock(&writer->localPubsLock);
+               for(int i=0; i < arrayList_size(writer->localPubs); i++) {
+                       
etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
+               }
+               celixThreadMutex_unlock(&writer->localPubsLock);
+               sleep(DEFAULT_ETCD_TTL / 2);
+       }
+
+       return NULL;
 }
 
 static const char* etcdWriter_getRootPath(bundle_context_pt context) {
-    const char* rootPath = NULL;
-    bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
-    if(rootPath == NULL) {
-        rootPath = DEFAULT_ETCD_ROOTPATH;
-    }
-    return rootPath;
+       const char* rootPath = NULL;
+       bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
+       if(rootPath == NULL) {
+               rootPath = DEFAULT_ETCD_ROOTPATH;
+       }
+       return rootPath;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/psd_activator.c 
b/pubsub/pubsub_discovery/private/src/psd_activator.c
index afbe282..89a517d 100644
--- a/pubsub/pubsub_discovery/private/src/psd_activator.c
+++ b/pubsub/pubsub_discovery/private/src/psd_activator.c
@@ -48,7 +48,7 @@ static celix_status_t createTMPublisherAnnounceTracker(struct 
activator *activat
        service_tracker_customizer_pt customizer = NULL;
 
        status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
-                       pubsub_discovery_tmPublisherAnnounceAdding,
+                       NULL,
                        pubsub_discovery_tmPublisherAnnounceAdded,
                        pubsub_discovery_tmPublisherAnnounceModified,
                        pubsub_discovery_tmPublisherAnnounceRemoved,

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c 
b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
index 0c7d6c4..94a8e11 100644
--- a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -167,7 +167,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt 
ps_discovery) {
     hashMapIterator_destroy(iter);
     hashMap_destroy(ps_discovery->watchers, true, true);
     celixThreadMutex_unlock(&ps_discovery->watchersMutex);
-
     return status;
 }
 
@@ -293,7 +292,7 @@ celix_status_t pubsub_discovery_announcePublisher(void 
*handle, pubsub_endpoint_
        }
        free(pub_key);
        pubsub_endpoint_pt p = NULL;
-       
pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+       pubsubEndpoint_clone(pubEP, &p);
 
        arrayList_add(pubEP_list,p);
 
@@ -397,16 +396,6 @@ celix_status_t pubsub_discovery_uninterestedInTopic(void 
*handle, const char* sc
 
 /* pubsub_topology_manager tracker callbacks */
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, 
service_reference_pt reference, void **service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
-
-       status = bundleContext_getService(pubsub_discovery->context, reference, 
service);
-
-       return status;
-}
-
 celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, 
service_reference_pt reference, void * service) {
        celix_status_t status = CELIX_SUCCESS;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h 
b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
index 5299808..c36f20e 100644
--- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -24,8 +24,8 @@
  *  \copyright Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_SERIALIZER_IMPL_H_
-#define PUBSUB_SERIALIZER_IMPL_H_
+#ifndef PUBSUB_SERIALIZER_JSON_H_
+#define PUBSUB_SERIALIZER_JSON_H_
 
 #include "dyn_common.h"
 #include "dyn_type.h"
@@ -34,25 +34,22 @@
 
 #include "pubsub_serializer.h"
 
+#define PUBSUB_SERIALIZER_TYPE "json"
+
 typedef struct pubsub_serializer {
        bundle_context_pt bundle_context;
        log_helper_pt loghelper;
 } pubsub_serializer_t;
 
-typedef struct pubsub_msg_serializer_impl {
-    pubsub_msg_serializer_t msgSerializer;
-    dyn_message_type* dynMsg;
-} pubsub_msg_serializer_impl_t;
-
 celix_status_t pubsubSerializer_create(bundle_context_pt context, 
pubsub_serializer_t* *serializer);
 celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
 
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* 
serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, 
pubsub_msg_serializer_map_t* map);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* 
serializer, bundle_pt bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, 
hash_map_pt serializerMap);
 
 /* Start of serializer specific functions */
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* 
impl, const void* msg, char** out, size_t *outLen);
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* 
impl, const char* input, size_t inputLen, void **out);
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void 
*msg);
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* 
msgSerializer, const void* msg, void** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* 
msgSerializer, const void* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void 
*msg);
 
-#endif /* PUBSUB_SERIALIZER_IMPL_H_ */
+#endif /* PUBSUB_SERIALIZER_JSON_H_ */

Reply via email to