This is an automated email from the ASF dual-hosted git repository.

oipo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3df528d  Feature/pubsub websocket serialization (#338)
3df528d is described below

commit 3df528d41c6a49a377514e1360f1bcb7b39cebd3
Author: Michael de Lang <[email protected]>
AuthorDate: Fri Apr 23 14:38:20 2021 +0200

    Feature/pubsub websocket serialization (#338)
    
    * Fix serializing/deserializing json for pubsub
    
    * Add service ranking to wire protocols and make endpoint matching print 
errors if match is mandatory but cannot be made
    
    * Fix memory leak in RSA
    
    * Move current pubsub websocket implementation to v1
    
    * Reset v1 to original
    
    * Add v2 websocket
---
 bundles/pubsub/CMakeLists.txt                      |   3 +-
 bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt  |   2 +-
 .../v2/src/pubsub_tcp_topic_receiver.c             |   1 -
 .../pubsub_admin_websocket/{ => v1}/CMakeLists.txt |   0
 .../{ => v1}/src/psa_activator.c                   |   0
 .../{ => v1}/src/pubsub_psa_websocket_constants.h  |   0
 .../{ => v1}/src/pubsub_websocket_admin.c          |   0
 .../{ => v1}/src/pubsub_websocket_admin.h          |   0
 .../{ => v1}/src/pubsub_websocket_common.c         |   0
 .../{ => v1}/src/pubsub_websocket_common.h         |   0
 .../{ => v1}/src/pubsub_websocket_topic_receiver.c |   0
 .../{ => v1}/src/pubsub_websocket_topic_receiver.h |   0
 .../{ => v1}/src/pubsub_websocket_topic_sender.c   |   0
 .../{ => v1}/src/pubsub_websocket_topic_sender.h   |   0
 .../pubsub_admin_websocket/{ => v2}/CMakeLists.txt |  20 +-
 .../{ => v2}/src/psa_activator.c                   |   6 +-
 .../{ => v2}/src/pubsub_psa_websocket_constants.h  |   0
 .../{ => v2}/src/pubsub_websocket_admin.c          | 267 +++++++++++++--------
 .../{ => v2}/src/pubsub_websocket_admin.h          |  12 +
 .../{ => v2}/src/pubsub_websocket_common.c         |   0
 .../{ => v2}/src/pubsub_websocket_common.h         |   0
 .../{ => v2}/src/pubsub_websocket_topic_receiver.c | 138 +++++------
 .../{ => v2}/src/pubsub_websocket_topic_receiver.h |   6 +-
 .../{ => v2}/src/pubsub_websocket_topic_sender.c   | 121 +++++-----
 .../{ => v2}/src/pubsub_websocket_topic_sender.h   |   7 +-
 bundles/pubsub/test/CMakeLists.txt                 |  74 +++++-
 26 files changed, 388 insertions(+), 269 deletions(-)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 1a780a2..3a66b25 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -38,7 +38,8 @@ if (PUBSUB)
 
     option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
     if (BUILD_PUBSUB_PSA_WS)
-        add_subdirectory(pubsub_admin_websocket)
+        add_subdirectory(pubsub_admin_websocket/v1)
+        add_subdirectory(pubsub_admin_websocket/v2)
     endif (BUILD_PUBSUB_PSA_WS)
 
     add_subdirectory(pubsub_api)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt 
b/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
index f7455da..117e85a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
@@ -32,7 +32,7 @@ add_celix_bundle(celix_pubsub_admin_tcp_v2
 
 set_target_properties(celix_pubsub_admin_tcp_v2 PROPERTIES INSTALL_RPATH 
"$ORIGIN")
 target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::pubsub_spi 
Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::framework 
Celix::dfi Celix::log_helper)
+target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::framework 
Celix::log_helper)
 target_include_directories(celix_pubsub_admin_tcp_v2 PRIVATE src)
 # cmake find package UUID set the wrong include dir for OSX
 if (NOT APPLE)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 2990029..853e49d 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -23,7 +23,6 @@
 #include <unistd.h>
 #include <pubsub/subscriber.h>
 #include <memory.h>
-#include <pubsub_constants.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
 #include "pubsub_tcp_handler.h"
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt 
b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
copy to bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/psa_activator.c
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/psa_activator.c
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_psa_websocket_constants.h
similarity index 100%
copy from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
copy to 
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_psa_websocket_constants.h
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.c
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.c
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.h
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.h
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.c
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.c
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.h
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.h
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
similarity index 100%
copy from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
copy to 
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.h
similarity index 100%
copy from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
copy to 
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.h
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.c
similarity index 100%
copy from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
copy to 
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.c
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h 
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.h
similarity index 100%
copy from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
copy to 
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.h
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt 
b/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
similarity index 60%
rename from bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
rename to bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
index f6397e0..6557ddb 100644
--- a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
@@ -18,8 +18,8 @@
 find_package(Jansson REQUIRED)
 find_package(UUID REQUIRED)
 
-add_celix_bundle(celix_pubsub_admin_websocket
-    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+add_celix_bundle(celix_pubsub_admin_websocket_v2
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket_v2"
     VERSION "1.0.0"
     GROUP "Celix/PubSub"
     SOURCES
@@ -30,16 +30,16 @@ add_celix_bundle(celix_pubsub_admin_websocket
         src/pubsub_websocket_common.c
 )
 
-set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH 
"$ORIGIN")
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE
-        Celix::framework Celix::dfi Celix::log_helper Celix::utils
+set_target_properties(celix_pubsub_admin_websocket_v2 PROPERTIES INSTALL_RPATH 
"$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE
+        Celix::framework Celix::log_helper Celix::utils
         Celix::http_admin_api
 )
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::pubsub_spi 
Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_websocket PRIVATE
+target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE 
Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_websocket_v2 PRIVATE
     src
 )
 
-install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT 
pubsub)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
+install_celix_bundle(celix_pubsub_admin_websocket_v2 EXPORT celix COMPONENT 
pubsub)
+target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket_v2 ALIAS 
celix_pubsub_admin_websocket_v2)
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
similarity index 96%
rename from bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
index 2f4b8cc..159d8ed 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
@@ -18,6 +18,7 @@
  */
 
 #include <stdlib.h>
+#include <pubsub_message_serialization_service.h>
 
 #include "celix_api.h"
 #include "pubsub_serializer.h"
@@ -47,7 +48,7 @@ int psa_websocket_start(psa_websocket_activator_t *act, 
celix_bundle_context_t *
     act->cmdSvcId = -1L;
     act->serializersTrackerId = -1L;
 
-    act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_websocket");
+    act->logHelper = celix_logHelper_create(ctx, 
"celix_psa_admin_websocket_v2");
 
     act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
     celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : 
CELIX_BUNDLE_EXCEPTION;
@@ -55,8 +56,7 @@ int psa_websocket_start(psa_websocket_activator_t *act, 
celix_bundle_context_t *
     //track serializers (only json)
     if (status == CELIX_SUCCESS) {
         celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-        opts.filter.filter = "(pubsub.serializer=json)";
+        opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
         opts.filter.ignoreServiceLanguage = true;
         opts.callbackHandle = act->admin;
         opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_psa_websocket_constants.h
similarity index 100%
rename from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
rename to 
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_psa_websocket_constants.h
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
similarity index 68%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
index 9c4633a..8950fda 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
@@ -21,6 +21,8 @@
 #include <pubsub_endpoint.h>
 #include <pubsub_serializer.h>
 #include <ip_utils.h>
+#include <pubsub_message_serialization_service.h>
+#include <pubsub_matching.h>
 
 #include "pubsub_utils.h"
 #include "pubsub_websocket_admin.h"
@@ -50,7 +52,7 @@ struct pubsub_websocket_admin {
     bool verbose;
 
     struct {
-        celix_thread_mutex_t mutex;
+        celix_thread_rwlock_t mutex;
         hash_map_t *map; //key = svcId, value = 
psa_websocket_serializer_entry_t*
     } serializers;
 
@@ -71,16 +73,15 @@ struct pubsub_websocket_admin {
 
 };
 
-typedef struct psa_websocket_serializer_entry {
-    const char *serType;
-    long svcId;
-    pubsub_serializer_service_t *svc;
-} psa_websocket_serializer_entry_t;
-
 static celix_status_t 
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, 
pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint);
 static celix_status_t 
pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* 
psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint);
 
 
+static void pubsub_websocketAdmin_getSerType(void *handle, void *svc 
__attribute__((unused)), const celix_properties_t* props) {
+    const char** out = handle;
+    *out = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
+}
+
 pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t 
*ctx, celix_log_helper_t *logHelper) {
     pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
     psa->ctx = ctx;
@@ -92,8 +93,8 @@ pubsub_websocket_admin_t* 
pubsub_websocketAdmin_create(celix_bundle_context_t *c
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadMutex_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
@@ -112,8 +113,7 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t 
*psa) {
         return;
     }
 
-    //note assuming al psa register services and service tracker are removed.
-
+    //note assuming all psa register services and service tracker are removed.
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -138,13 +138,13 @@ void 
pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     iter = hashMapIterator_construct(psa->serializers.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_websocket_serializer_entry_t *entry = 
hashMapIterator_nextValue(&iter);
         free(entry);
     }
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadRwlock_unlock(&psa->serializers.mutex);
 
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
@@ -155,7 +155,7 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t 
*psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadMutex_destroy(&psa->serializers.mutex);
+    celixThreadRwlock_destroy(&psa->serializers.mutex);
     hashMap_destroy(psa->serializers.map, false, false);
 
     free(psa);
@@ -164,29 +164,48 @@ void 
pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
 void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props) {
     pubsub_websocket_admin_t *psa = handle;
 
-    const char *serType = celix_properties_get(props, 
PUBSUB_SERIALIZER_TYPE_KEY, NULL);
-    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
+    const char *serType = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
+    long msgId = celix_properties_getAsLong(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
+    const char *msgFqn = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+    const char *msgVersion = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+
+    if (serType == NULL || msgId == -1L || msgFqn == NULL) {
+        L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of 
the following properties: %s or %s or %s",
+               
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
 
-    if (serType == NULL) {
-        L_INFO("[PSA_WEBSOCKET] Ignoring serializer service without %s 
property", PUBSUB_SERIALIZER_TYPE_KEY);
+        L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn 
%s", serType, msgId, msgFqn);
         return;
     }
-
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    psa_websocket_serializer_entry_t *entry = 
hashMap_get(psa->serializers.map, (void*)svcId);
+    L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s", 
serType, msgId, msgFqn);
+
+    celixThreadRwlock_writeLock(&psa->serializers.mutex);
+    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
+    if(typeEntries == NULL) {
+        typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
+        hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), 
typeEntries);
+        L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s", 
psa->serializers.map, serType);
+    }
+    psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, 
(void*)msgId);
     if (entry == NULL) {
-        entry = calloc(1, sizeof(*entry));
-        entry->serType = serType;
-        entry->svcId = svcId;
+        entry = calloc(1, sizeof(psa_websocket_serializer_entry_t));
         entry->svc = svc;
-        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+        entry->fqn = celix_utils_strdup(msgFqn);
+        entry->version = celix_utils_strdup(msgVersion);
+        hashMap_put(typeEntries, (void*)msgId, entry);
+        L_INFO("[PSA_WEBSOCKET_V2] entry added");
     }
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadRwlock_unlock(&psa->serializers.mutex);
 }
 
 void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props) {
     pubsub_websocket_admin_t *psa = handle;
-    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
+    const char *serType = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
+    long msgId = celix_properties_getAsLong(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
+
+    if(serType == NULL || msgId == -1) {
+        L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i", 
serType, msgId);
+        return;
+    }
 
     //remove serializer
     // 1) First find entry and
@@ -194,48 +213,59 @@ void pubsub_websocketAdmin_removeSerializerSvc(void 
*handle, void *svc, const ce
     // 3) loop and destroy all topic receivers using the serializer
     // Note that it is the responsibility of the topology manager to create 
new topic senders/receivers
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
-    psa_websocket_serializer_entry_t *entry = 
hashMap_remove(psa->serializers.map, (void*)svcId);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadRwlock_writeLock(&psa->serializers.mutex);
+    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
+    if(typeEntries != NULL) {
+        psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries, 
(void*)msgId);
+        free((void*)entry->fqn);
+        free((void*)entry->version);
+        free(entry);
 
-    if (entry != NULL) {
-        celixThreadMutex_lock(&psa->topicSenders.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            pubsub_websocket_topic_sender_t *sender = 
hashMapEntry_getValue(senderEntry);
-            if (sender != NULL && entry->svcId == 
pubsub_websocketTopicSender_serializerSvcId(sender)) {
-                char *key = hashMapEntry_getKey(senderEntry);
-                hashMapIterator_remove(&iter);
-                pubsub_websocketTopicSender_destroy(sender);
-                free(key);
+        // check if there are no remaining serializers for the given type. If 
not, remove all senders and receivers for this type.
+        if(hashMap_size(typeEntries) == 0) {
+            hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, 
serType), true, false);
+            celixThreadRwlock_unlock(&psa->serializers.mutex);
+
+            celixThreadMutex_lock(&psa->topicSenders.mutex);
+            hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *senderEntry = 
hashMapIterator_nextEntry(&iter);
+                pubsub_websocket_topic_sender_t *sender = 
hashMapEntry_getValue(senderEntry);
+                if (sender != NULL && strncmp(serType, 
pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) {
+                    char *key = hashMapEntry_getKey(senderEntry);
+                    hashMapIterator_remove(&iter);
+                    pubsub_websocketTopicSender_destroy(sender);
+                    free(key);
+                }
             }
-        }
-        celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-        celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        iter = hashMapIterator_construct(psa->topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            pubsub_websocket_topic_receiver_t *receiver = 
hashMapEntry_getValue(senderEntry);
-            if (receiver != NULL && entry->svcId == 
pubsub_websocketTopicReceiver_serializerSvcId(receiver)) {
-                char *key = hashMapEntry_getKey(senderEntry);
-                hashMapIterator_remove(&iter);
-                pubsub_websocketTopicReceiver_destroy(receiver);
-                free(key);
+            celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+            celixThreadMutex_lock(&psa->topicReceivers.mutex);
+            iter = hashMapIterator_construct(psa->topicReceivers.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *senderEntry = 
hashMapIterator_nextEntry(&iter);
+                pubsub_websocket_topic_receiver_t *receiver = 
hashMapEntry_getValue(senderEntry);
+                if (receiver != NULL && strncmp(serType, 
pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
+                    char *key = hashMapEntry_getKey(senderEntry);
+                    hashMapIterator_remove(&iter);
+                    pubsub_websocketTopicReceiver_destroy(receiver);
+                    free(key);
+                }
             }
+            celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+        } else {
+            celixThreadRwlock_unlock(&psa->serializers.mutex);
         }
-        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
-        free(entry);
+    } else {
+        celixThreadRwlock_unlock(&psa->serializers.mutex);
     }
 }
 
 celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t 
**topicProperties, double *outScore, long *outSerializerSvcId, long 
*outProtocolSvcId) {
     pubsub_websocket_admin_t *psa = handle;
-    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
+    L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
-    double score = pubsubEndpoint_matchPublisher(psa->ctx, svcRequesterBndId, 
svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, 
svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
                                                psa->qosSampleScore, 
psa->qosControlScore, psa->defaultScore,
                                                false, topicProperties, 
outSerializerSvcId, outProtocolSvcId);
     *outScore = score;
@@ -245,9 +275,9 @@ celix_status_t pubsub_websocketAdmin_matchPublisher(void 
*handle, long svcReques
 
 celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long 
svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t 
**topicProperties, double *outScore, long *outSerializerSvcId, long 
*outProtocolSvcId) {
     pubsub_websocket_admin_t *psa = handle;
-    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
+    L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
-    double score = pubsubEndpoint_matchSubscriber(psa->ctx, svcProviderBndId, 
svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, 
svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
                                                 psa->qosSampleScore, 
psa->qosControlScore, psa->defaultScore,
                                                 false, topicProperties, 
outSerializerSvcId, outProtocolSvcId);
     if (outScore != NULL) {
@@ -258,9 +288,9 @@ celix_status_t pubsub_websocketAdmin_matchSubscriber(void 
*handle, long svcProvi
 
 celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint, bool *outMatch) {
     pubsub_websocket_admin_t *psa = handle;
-    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
+    L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
-    bool match = pubsubEndpoint_match(psa->ctx, psa->log, endpoint, 
PUBSUB_WEBSOCKET_ADMIN_TYPE, false, NULL, NULL);
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, psa->log, endpoint, 
PUBSUB_WEBSOCKET_ADMIN_TYPE, false, NULL, NULL);
     if (outMatch != NULL) {
         *outMatch = match;
     }
@@ -280,17 +310,23 @@ celix_status_t 
pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-    celixThreadMutex_lock(&psa->serializers.mutex);
+    //get serializer type
+    const char *serType = NULL;
+    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+    opts.callbackHandle = &serType;
+    opts.useWithProperties = pubsub_websocketAdmin_getSerType;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    char filter[32];
+    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, 
serializerSvcId);
+    opts.filter.filter = filter;
+    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_websocket_topic_sender_t *sender = 
hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
-        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serializerSvcId);
-        if (serEntry != NULL) {
-            sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, 
scope, topic, serializerSvcId, serEntry->svc);
-        }
+        sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, 
topic, serType, psa);
         if (sender != NULL) {
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
-            const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, 
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
                                                 serType, NULL, NULL);
 
@@ -304,15 +340,14 @@ celix_status_t 
pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
             }
             hashMap_put(psa->topicSenders.map, key, sender);
         } else {
-            L_ERROR("[PSA_WEBSOCKET] Error creating a TopicSender");
+            L_ERROR("[PSA_WEBSOCKET_V2] Error creating a TopicSender");
             free(key);
         }
     } else {
         free(key);
-        L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicSender for 
scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
+        L_ERROR("[PSA_WEBSOCKET_V2] Cannot setup already existing TopicSender 
for scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
 
     if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
         *outPublisherEndpoint = newEndpoint;
@@ -337,7 +372,7 @@ celix_status_t 
pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
         free(mapKey);
         pubsub_websocketTopicSender_destroy(sender);
     } else {
-        L_ERROR("[PSA_WEBSOCKET] Cannot teardown TopicSender with scope/topic 
%s/%s. Does not exists", scope == NULL ? "(null)" : scope, topic);
+        L_ERROR("[PSA_WEBSOCKET_V2] Cannot teardown TopicSender with 
scope/topic %s/%s. Does not exists", scope == NULL ? "(null)" : scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     free(key);
@@ -350,20 +385,24 @@ celix_status_t 
pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
 
     celix_properties_t *newEndpoint = NULL;
 
+    //get serializer type
+    const char *serType = NULL;
+    celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+    opts.callbackHandle = &serType;
+    opts.useWithProperties = pubsub_websocketAdmin_getSerType;
+    opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+    char filter[32];
+    snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, 
serializerSvcId);
+    opts.filter.filter = filter;
+    celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     pubsub_websocket_topic_receiver_t *receiver = 
hashMap_get(psa->topicReceivers.map, key);
     if (receiver == NULL) {
-        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serializerSvcId);
-        if (serEntry != NULL) {
-            receiver = pubsub_websocketTopicReceiver_create(psa->ctx, 
psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
-        } else {
-            L_ERROR("[PSA_WEBSOCKET] Cannot find serializer for TopicSender 
%s/%s", scope == NULL ? "(null)" : scope, topic);
-        }
+        receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, 
scope, topic, topicProperties, serType, psa);
         if (receiver != NULL) {
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
-            const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
                                                 
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
 
@@ -377,15 +416,14 @@ celix_status_t 
pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
             }
             hashMap_put(psa->topicReceivers.map, key, receiver);
         } else {
-            L_ERROR("[PSA_WEBSOCKET] Error creating a TopicReceiver.");
+            L_ERROR("[PSA_WEBSOCKET_V2] Error creating a TopicReceiver.");
             free(key);
         }
     } else {
         free(key);
-        L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicReceiver 
for scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
+        L_ERROR("[PSA_WEBSOCKET_V2] Cannot setup already existing 
TopicReceiver for scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
 
     if (receiver != NULL && newEndpoint != NULL) {
         celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
@@ -540,19 +578,62 @@ celix_status_t 
pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
     return status;
 }
 
+psa_websocket_serializer_entry_t* 
pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char 
*serializationType, uint32_t msgId) {
+    pubsub_websocket_admin_t *psa = handle;
+    psa_websocket_serializer_entry_t *serializer = NULL;
+
+    celixThreadRwlock_readLock(&psa->serializers.mutex);
+    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, 
serializationType);
+    if(typeEntries != NULL) {
+        serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
+    }
+
+    return serializer;
+}
+
+void pubsub_websocketAdmin_releaseSerializer(void *handle, 
psa_websocket_serializer_entry_t* serializer) {
+    pubsub_websocket_admin_t *psa = handle;
+    celixThreadRwlock_unlock(&psa->serializers.mutex);
+}
+
+int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const 
char *serializationType, const char *fqn) {
+    pubsub_websocket_admin_t *psa = handle;
+    int64_t id = -1L;
+
+    celixThreadRwlock_readLock(&psa->serializers.mutex);
+    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, 
serializationType);
+    if(typeEntries != NULL) {
+        hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
+        while(hashMapIterator_hasNext(&iterator)) {
+            void *key = hashMapIterator_nextKey(&iterator);
+            psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, 
key);
+            L_WARN("[PSA_WEBSOCKET_V2] 
pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, 
fqn);
+            if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
+                id = (uint32_t)(uintptr_t)key;
+                break;
+            }
+        }
+    } else {
+        L_WARN("[PSA_WEBSOCKET_V2] 
pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", 
serializationType, fqn);
+    }
+    celixThreadRwlock_unlock(&psa->serializers.mutex);
+
+    L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn 
%p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
+
+    return id;
+}
+
 bool pubsub_websocketAdmin_executeCommand(void *handle, const char 
*commandLine __attribute__((unused)), FILE *out, FILE *errStream 
__attribute__((unused))) {
     pubsub_websocket_admin_t *psa = handle;
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
-    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_websocket_topic_sender_t *sender = 
hashMapIterator_nextValue(&iter);
-        long serSvcId = pubsub_websocketTopicSender_serializerSvcId(sender);
-        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serSvcId);
-        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *serType = 
pubsub_websocketTopicSender_serializerType(sender);
         const char *scope = pubsub_websocketTopicSender_scope(sender);
         const char *topic = pubsub_websocketTopicSender_topic(sender);
         const char *url = pubsub_websocketTopicSender_url(sender);
@@ -561,18 +642,16 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, 
const char *commandLine
         fprintf(out, "   |- url             = %s\n", url);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadRwlock_unlock(&psa->serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
-    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_websocket_topic_receiver_t *receiver = 
hashMapIterator_nextValue(&iter);
-        long serSvcId = 
pubsub_websocketTopicReceiver_serializerSvcId(receiver);
-        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serSvcId);
-        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *serType = 
pubsub_websocketTopicReceiver_serializerType(receiver);
         const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
         const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
         const char *urlEndp = pubsub_websocketTopicReceiver_url(receiver);
@@ -598,7 +677,7 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, 
const char *commandLine
         celix_arrayList_destroy(unconnected);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-    celixThreadMutex_unlock(&psa->serializers.mutex);
+    celixThreadRwlock_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
     return true;
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.h
similarity index 83%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.h
index bae6333..960c266 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.h
@@ -20,13 +20,21 @@
 #ifndef CELIX_PUBSUB_WEBSOCKET_ADMIN_H
 #define CELIX_PUBSUB_WEBSOCKET_ADMIN_H
 
+#include <stdint.h>
 #include <pubsub_admin_metrics.h>
 #include "celix_api.h"
 #include "celix_log_helper.h"
 #include "pubsub_psa_websocket_constants.h"
+#include <pubsub_message_serialization_service.h>
 
 typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
 
+typedef struct psa_websocket_serializer_entry {
+    const char *fqn;
+    const char *version;
+    pubsub_message_serialization_service_t *svc;
+} psa_websocket_serializer_entry_t;
+
 pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t 
*ctx, celix_log_helper_t *logHelper);
 void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
 
@@ -46,6 +54,10 @@ celix_status_t 
pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
 void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props);
 void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props);
 
+psa_websocket_serializer_entry_t* 
pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char 
*serializationType, uint32_t msgId);
+void pubsub_websocketAdmin_releaseSerializer(void *handle, 
psa_websocket_serializer_entry_t* serializer);
+int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const 
char *serializationType, const char *fqn);
+
 bool pubsub_websocketAdmin_executeCommand(void *handle, const char 
*commandLine, FILE *outStream, FILE *errStream);
 
 #endif //CELIX_PUBSUB_WEBSOCKET_ADMIN_H
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.c
similarity index 100%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.c
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
similarity index 100%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
similarity index 87%
rename from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
rename to 
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index 7d8cd00..c93f078 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -30,6 +30,7 @@
 #include "pubsub_websocket_topic_receiver.h"
 #include "pubsub_psa_websocket_constants.h"
 #include "pubsub_websocket_common.h"
+#include "pubsub_websocket_admin.h"
 
 #include <uuid/uuid.h>
 #include <http_admin/api.h>
@@ -64,10 +65,10 @@ typedef struct pubsub_websocket_msg_entry {
 struct pubsub_websocket_topic_receiver {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    long serializerSvcId;
-    pubsub_serializer_service_t *serializer;
+    void *admin;
     char *scope;
     char *topic;
+    char *serType;
     char scopeAndTopicFilter[5];
     char *uri;
 
@@ -109,7 +110,6 @@ typedef struct psa_websocket_requested_connection_entry {
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
-    hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
     hash_map_t *subscriberServices; //key = servide id, value = 
pubsub_subscriber_t*
     bool initialized; //true if the init function is called through the 
receive thread
 } psa_websocket_subscriber_entry_t;
@@ -120,7 +120,6 @@ static void 
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
 static void* psa_websocket_recvThread(void * data);
 static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
 *receiver);
 static void 
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t 
*receiver);
-static void *psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t 
*msg_type_id_map);
 
 static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, 
void *handle);
 static int psa_websocketTopicReceiver_data(struct mg_connection *connection, 
int op_code, char *data, size_t length, void *handle);
@@ -132,15 +131,15 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
                                                               const char 
*scope,
                                                               const char 
*topic,
                                                               const 
celix_properties_t *topicProperties,
-                                                              long 
serializerSvcId,
-                                                              
pubsub_serializer_service_t *serializer) {
+                                                              const char 
*serType,
+                                                              void *admin) {
     pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
-    receiver->serializerSvcId = serializerSvcId;
-    receiver->serializer = serializer;
+    receiver->serType = celix_utils_strdup(serType);
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
+    receiver->admin = admin;
     psa_websocket_setScopeAndTopicFilter(scope, topic, 
receiver->scopeAndTopicFilter);
 
     receiver->uri = psa_websocket_createURI(scope, topic);
@@ -266,7 +265,6 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
-                
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
                 hashMap_destroy(entry->subscriberServices, false, false);
                 free(entry);
             }
@@ -311,6 +309,7 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         free(receiver->uri);
         free(receiver->scope);
         free(receiver->topic);
+        free(receiver->serType);
     }
     free(receiver);
 }
@@ -325,8 +324,8 @@ const char* 
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t
     return receiver->uri;
 }
 
-long 
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t 
*receiver) {
-    return receiver->serializerSvcId;
+const char 
*pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t 
*receiver) {
+    return receiver->serType;
 }
 
 void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t 
*unconnectedUrls) {
@@ -417,24 +416,14 @@ static void 
pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_websocket_subscriber_entry_t *entry = 
hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-    } else {
+    if (entry == NULL) {
         //new create entry
         entry = calloc(1, sizeof(*entry));
         entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
         entry->initialized = false;
-        hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-
-        int rc = 
receiver->serializer->createSerializerMap(receiver->serializer->handle, 
(celix_bundle_t*)bnd, &entry->msgTypes);
-
-        if (rc == 0) {
-            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-        } else {
-            L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for 
TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, 
receiver->topic);
-            free(entry);
-        }
+        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
     }
+    hashMap_put(entry->subscriberServices, (void*)svcId, svc);
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
@@ -453,74 +442,67 @@ static void 
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
     if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
         //remove entry
         hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        int rc = 
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
-        if (rc != 0) {
-            L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for 
TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope, 
receiver->topic);
-        }
         hashMap_destroy(entry->subscriberServices, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void * psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t 
*msg_type_id_map) {
-    void *msgTypeId = NULL;
-    if(fqn != NULL && msg_type_id_map != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(msg_type_id_map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
-            pubsub_msg_serializer_t *serializer = hashMapEntry_getValue(entry);
-            if(strcmp(serializer->msgName, fqn) == 0) {
-                msgTypeId =  hashMapEntry_getKey(entry);
-                return msgTypeId;
-            }
-        }
+static inline void 
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, 
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, 
const char* payload, size_t payloadSize) {
+    //NOTE receiver->subscribers.mutex locked
+
+    int64_t msgTypeId = 
pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin, 
receiver->serType, hdr->id);
+
+    if(msgTypeId < 0) {
+        L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with 
serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, 
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+        return;
     }
 
-    return msgTypeId;
-}
+    psa_websocket_serializer_entry_t *serializer = 
pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin, 
receiver->serType, msgTypeId);
 
-static inline void 
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, 
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, 
const char* payload, size_t payloadSize) {
-    //NOTE receiver->subscribers.mutex locked
-    void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, 
entry->msgTypes);
-    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
-
-    if (msgSer!= NULL && msgTypeId != 0) {
-        void *deSerializedMsg = NULL;
-        bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion, 
hdr);
-        if (validVersion) {
-            struct iovec deSerializeBuffer;
-            deSerializeBuffer.iov_base = (void *)payload;
-            deSerializeBuffer.iov_len  = payloadSize;
-            celix_status_t status = msgSer->deserialize(msgSer->handle, 
&deSerializeBuffer, 0, &deSerializedMsg);
-
-            if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
-                bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = 
hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deSerializedMsg, NULL, &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more 
receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, 
&deSerializeBuffer, 0, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg 
type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? 
"(null)" : receiver->scope, receiver->topic);
-                            break;
-                        }
-                        release = true;
+    if(serializer == NULL) {
+        pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
+        L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with 
serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, 
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+        return;
+    }
+
+    void *deSerializedMsg = NULL;
+
+    celix_version_t* version = 
celix_version_createVersionFromString(serializer->version);
+    bool validVersion = psa_websocket_checkVersion(version, hdr);
+    celix_version_destroy(version);
+    if (validVersion) {
+        struct iovec deSerializeBuffer;
+        deSerializeBuffer.iov_base = (void *)payload;
+        deSerializeBuffer.iov_len  = payloadSize;
+        celix_status_t status = 
serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, 
&deSerializedMsg);
+
+        if (status == CELIX_SUCCESS) {
+            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
+            bool release = true;
+            while (hashMapIterator_hasNext(&iter)) {
+                pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                svc->receive(svc->handle, serializer->fqn, msgTypeId, 
deSerializedMsg, NULL, &release);
+                if (!release && hashMapIterator_hasNext(&iter)) {
+                    //receive function has taken ownership and still more 
receive function to come ..
+                    //deserialize again for new message
+                    status = 
serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, 
&deSerializedMsg);
+                    if (status != CELIX_SUCCESS) {
+                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type 
%s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" 
: receiver->scope, receiver->topic);
+                        break;
                     }
+                    release = true;
                 }
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, 
deSerializedMsg);
-                }
-            } else {
-                L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
             }
+            if (release) {
+                serializer->svc->freeDeserializedMsg(serializer->svc->handle, 
deSerializedMsg);
+            }
+        } else {
+            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
         }
-    } else {
-        L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X, 
fqn %s", msgTypeId, hdr->id);
     }
+
+    pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
 }
 
 static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, 
const char *msg, size_t msgSize) {
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
similarity index 92%
rename from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
rename to 
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
index 4df42a0..55d5255 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
@@ -30,15 +30,15 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        long serializerSvcId,
-        pubsub_serializer_service_t *serializer);
+        const char *serType,
+        void *admin);
 void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t 
*receiver);
 
 const char* 
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t 
*receiver);
 const char* 
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t 
*receiver);
 const char* 
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t *receiver);
+const char 
*pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t 
*sender);
 
-long 
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t 
*receiver);
 void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t 
*unconnectedUrls);
 
 void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort);
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
similarity index 77%
rename from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
rename to 
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index ac3108c..98a1ad7 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -28,11 +28,11 @@
 #include "pubsub_websocket_topic_sender.h"
 #include "pubsub_psa_websocket_constants.h"
 #include "pubsub_websocket_common.h"
-#include <uuid/uuid.h>
 #include <jansson.h>
 #include "celix_constants.h"
 #include "http_admin/api.h"
 #include "civetweb.h"
+#include "pubsub_websocket_admin.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 
@@ -48,11 +48,11 @@
 struct pubsub_websocket_topic_sender {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    long serializerSvcId;
-    pubsub_serializer_service_t *serializer;
 
+    void *admin;
     char *scope;
     char *topic;
+    char *serType;
     char scopeAndTopicFilter[5];
     char *uri;
 
@@ -73,16 +73,13 @@ struct pubsub_websocket_topic_sender {
 
 typedef struct psa_websocket_send_msg_entry {
     pubsub_websocket_msg_header_t header; //partially filled header (only 
seqnr and time needs to be updated per send)
-    pubsub_msg_serializer_t *msgSer;
-    celix_thread_mutex_t sendLock; //protects send & header(.seqNr)
+    uint32_t type; //msg type id (hash of fqn)
 } psa_websocket_send_msg_entry_t;
 
 typedef struct psa_websocket_bounded_service_entry {
     pubsub_websocket_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
-    hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
-    hash_map_t *msgTypeIds; //key = msg name, value = msg type id
     hash_map_t *msgEntries; //key = msg type id, value = 
psa_websocket_send_msg_entry_t
     int getCount;
 } psa_websocket_bounded_service_entry_t;
@@ -102,15 +99,22 @@ pubsub_websocket_topic_sender_t* 
pubsub_websocketTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        long serializerSvcId,
-        pubsub_serializer_service_t *ser) {
+        const char *serType,
+        void *admin) {
     pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serializerSvcId = serializerSvcId;
-    sender->serializer = ser;
+    sender->serType = celix_utils_strdup(serType);
+
+    if(sender->serType == NULL) {
+        L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType");
+        free(sender);
+        return NULL;
+    }
+
     psa_websocket_setScopeAndTopicFilter(scope, topic, 
sender->scopeAndTopicFilter);
     sender->uri = psa_websocket_createURI(scope, topic);
+    sender->admin = admin;
 
     if (sender->uri != NULL) {
         celix_properties_t *props = celix_properties_create();
@@ -171,8 +175,6 @@ void 
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
-                
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
-
                 hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
                 while (hashMapIterator_hasNext(&iter2)) {
                     psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
@@ -196,14 +198,11 @@ void 
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         }
         free(sender->topic);
         free(sender->uri);
+        free(sender->serType);
         free(sender);
     }
 }
 
-long 
pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t 
*sender) {
-    return sender->serializerSvcId;
-}
-
 const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t 
*sender) {
     return sender->scope;
 }
@@ -216,9 +215,16 @@ const char* 
pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
     return sender->uri;
 }
 
+const char* 
pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t 
*sender) {
+    return sender->serType;
+}
+
 static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* 
msgType, unsigned int* msgTypeId) {
     psa_websocket_bounded_service_entry_t *entry = 
(psa_websocket_bounded_service_entry_t *) handle;
-    *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds, 
msgType);
+    int64_t rc = 
pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin, 
entry->parent->serType, msgType);
+    if(rc >= 0) {
+        *msgTypeId = (unsigned int)rc;
+    }
     return 0;
 }
 
@@ -227,7 +233,7 @@ static void* psa_websocket_getPublisherService(void 
*handle, const celix_bundle_
     long bndId = celix_bundle_getId(requestingBundle);
 
     celixThreadMutex_lock(&sender->boundedServices.mutex);
-    psa_websocket_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void*)bndId);
+    psa_websocket_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
     if (entry != NULL) {
         entry->getCount += 1;
     } else {
@@ -236,33 +242,10 @@ static void* psa_websocket_getPublisherService(void 
*handle, const celix_bundle_
         entry->parent = sender;
         entry->bndId = bndId;
         entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->msgTypeIds = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-
-        int rc = 
sender->serializer->createSerializerMap(sender->serializer->handle, 
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
-        if (rc == 0) {
-            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgTypes);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *hashMapEntry = 
hashMapIterator_nextEntry(&iter);
-                void *key = hashMapEntry_getKey(hashMapEntry);
-                psa_websocket_send_msg_entry_t *sendEntry = calloc(1, 
sizeof(*sendEntry));
-                sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
-                sendEntry->header.id = sendEntry->msgSer->msgName;
-                int major;
-                int minor;
-                version_getMajor(sendEntry->msgSer->msgVersion, &major);
-                version_getMinor(sendEntry->msgSer->msgVersion, &minor);
-                sendEntry->header.major = (uint8_t)major;
-                sendEntry->header.minor = (uint8_t)minor;
-                hashMap_put(entry->msgEntries, key, sendEntry);
-                hashMap_put(entry->msgTypeIds, 
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) 
sendEntry->msgSer->msgId);
-            }
-            entry->service.handle = entry;
-            entry->service.localMsgTypeIdForMsgType = 
psa_websocket_localMsgTypeIdForMsgType;
-            entry->service.send = psa_websocket_topicPublicationSend;
-            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
-        } else {
-            L_ERROR("Error creating serializer map for websocket TopicSender 
%s/%s", sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-        }
+        entry->service.handle = entry;
+        entry->service.localMsgTypeIdForMsgType = 
psa_websocket_localMsgTypeIdForMsgType;
+        entry->service.send = psa_websocket_topicPublicationSend;
+        hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 
@@ -281,10 +264,6 @@ static void psa_websocket_ungetPublisherService(void 
*handle, const celix_bundle
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void*)bndId);
-        int rc = 
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
-        if (rc != 0) {
-            L_ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
-        }
 
         hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgEntries);
         while (hashMapIterator_hasNext(&iter)) {
@@ -293,7 +272,6 @@ static void psa_websocket_ungetPublisherService(void 
*handle, const celix_bundle
         }
         hashMap_destroy(entry->msgEntries, false, false);
 
-        hashMap_destroy(entry->msgTypeIds, true, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -303,25 +281,44 @@ static int psa_websocket_topicPublicationSend(void* 
handle, unsigned int msgType
     int status = CELIX_SERVICE_EXCEPTION;
     psa_websocket_bounded_service_entry_t *bound = handle;
     pubsub_websocket_topic_sender_t *sender = bound->parent;
+
+    psa_websocket_serializer_entry_t *serializer = 
pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin, 
sender->serType, msgTypeId);
+
+    if(serializer == NULL) {
+        pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
+        L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with 
serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId, 
sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+        return CELIX_SERVICE_EXCEPTION;
+    }
+    
     psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, 
(void *) (uintptr_t) (msgTypeId));
 
-    if (sender->sockConnection != NULL && entry != NULL) {
+    if(entry == NULL) {
+        entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t));
+        entry->type = msgTypeId;
+        entry->header.id = serializer->fqn;
+        celix_version_t* version = 
celix_version_createVersionFromString(serializer->version);
+        entry->header.major = (uint8_t)celix_version_getMajor(version);
+        entry->header.minor = (uint8_t)celix_version_getMinor(version);
+        entry->header.seqNr = 0;
+        celix_version_destroy(version);
+        hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+    }
+
+    if (sender->sockConnection != NULL) {
         delay_first_send_for_late_joiners(sender);
         size_t serializedOutputLen = 0;
         struct iovec* serializedOutput = NULL;
-        status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
+        status = serializer->svc->serialize(serializer->svc->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
 
         if (status == CELIX_SUCCESS /*ser ok*/) {
             json_error_t jsError;
-            unsigned char *hdrEncoded = 
calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
-
-            celixThreadMutex_lock(&entry->sendLock);
 
             json_t *jsMsg = json_object();
             json_object_set_new_nocheck(jsMsg, "id", 
json_string(entry->header.id));
             json_object_set_new_nocheck(jsMsg, "major", 
json_integer(entry->header.major));
             json_object_set_new_nocheck(jsMsg, "minor", 
json_integer(entry->header.minor));
-            json_object_set_new_nocheck(jsMsg, "seqNr", 
json_integer(entry->header.seqNr++));
+            uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1, 
__ATOMIC_RELAXED);
+            json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr));
 
             json_t *jsData;
             jsData = json_loadb((const char *)serializedOutput->iov_base, 
serializedOutput->iov_len, 0, &jsError);
@@ -339,21 +336,19 @@ static int psa_websocket_topicPublicationSend(void* 
handle, unsigned int msgType
             } else {
                 L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket, serialized 
data corrupt. Error(%d;%d;%d): %s", jsError.column, jsError.line, 
jsError.position, jsError.text);
             }
-            celixThreadMutex_unlock(&entry->sendLock);
 
             json_decref(jsMsg); //Decrease ref count means freeing the object
-            free(hdrEncoded);
-            entry->msgSer->freeSerializeMsg(entry->msgSer->handle, 
serializedOutput, serializedOutputLen);
+            serializer->svc->freeSerializedMsg(serializer->svc->handle, 
serializedOutput, serializedOutputLen);
         } else {
             L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for 
scope/topic %s/%s",
-                   entry->msgSer->msgName, sender->scope == NULL ? "(null)" : 
sender->scope, sender->topic);
+                   entry->header.id, sender->scope == NULL ? "(null)" : 
sender->scope, sender->topic);
         }
-    } else if (entry == NULL){
-        L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i 
for scope/topic %s/%s", msgTypeId, sender->scope == NULL ? "(null)" : 
sender->scope, sender->topic);
     } else { // when (sender->sockConnection == NULL) we dont have a client, 
but we do have a valid entry
        status = CELIX_SUCCESS; // Not an error, just nothing to do
     }
 
+    pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
+
     return status;
 }
 
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
similarity index 90%
rename from 
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
rename to 
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
index 7a839d1..8f8cebf 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
@@ -30,14 +30,13 @@ pubsub_websocket_topic_sender_t* 
pubsub_websocketTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        long serializerSvcId,
-        pubsub_serializer_service_t *ser);
+        const char *serType,
+        void *admin);
 void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t 
*sender);
 
 const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t 
*sender);
 const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t 
*sender);
 const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t 
*sender);
-
-long 
pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t 
*sender);
+const char 
*pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t 
*sender);
 
 #endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
diff --git a/bundles/pubsub/test/CMakeLists.txt 
b/bundles/pubsub/test/CMakeLists.txt
index 7be3658..fd16f9e 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -450,18 +450,18 @@ if (BUILD_PUBSUB_PSA_WS)
             LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
             PROPERTIES
-                LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
-                CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
-                USE_WEBSOCKETS=true
-                LISTENING_PORTS=8080
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+            USE_WEBSOCKETS=true
+            LISTENING_PORTS=8080
             BUNDLES
-                Celix::pubsub_serializer_json
-                Celix::http_admin
-                Celix::pubsub_topology_manager
-                Celix::pubsub_admin_websocket
-                pubsub_sut
-                pubsub_tst
-    )
+            Celix::pubsub_serializer_json
+            Celix::http_admin
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_websocket
+            pubsub_sut
+            pubsub_tst
+            )
     target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api 
${CppUTest_LIBRARIES} Jansson Celix::dfi civetweb_shared)
     target_include_directories(pubsub_websocket_tests SYSTEM PRIVATE 
${CppUTest_INCLUDE_DIR} test)
     add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests 
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>)
@@ -495,6 +495,58 @@ if (BUILD_PUBSUB_PSA_WS)
 
     add_test(NAME pstm_deadlock_websocket_test COMMAND 
pstm_deadlock_websocket_test WORKING_DIRECTORY 
$<TARGET_PROPERTY:pstm_deadlock_websocket_test,CONTAINER_LOC>)
     setup_target_for_coverage(pstm_deadlock_websocket_test SCAN_DIR ..)
+
+    add_celix_container(pubsub_websocket_v2_tests
+            USE_CONFIG
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            DIR ${CMAKE_CURRENT_BINARY_DIR}
+            PROPERTIES
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+            USE_WEBSOCKETS=true
+            LISTENING_PORTS=8080
+            BUNDLES
+            Celix::pubsub_serializer_json
+            Celix::http_admin
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_websocket_v2
+            pubsub_sut
+            pubsub_tst
+            pubsub_serializer
+            )
+    target_link_libraries(pubsub_websocket_v2_tests PRIVATE Celix::pubsub_api 
${CppUTest_LIBRARIES} Jansson civetweb_shared)
+    target_include_directories(pubsub_websocket_v2_tests SYSTEM PRIVATE 
${CppUTest_INCLUDE_DIR} test)
+    add_test(NAME pubsub_websocket_v2_tests COMMAND pubsub_websocket_v2_tests 
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_v2_tests,CONTAINER_LOC>)
+    setup_target_for_coverage(pubsub_websocket_v2_tests SCAN_DIR ..)
+
+    add_celix_container(pstm_deadlock_websocket_v2_test
+            USE_CONFIG #ensures that a config.properties will be created with 
the launch bundles.
+            LAUNCHER_SRC 
${CMAKE_CURRENT_LIST_DIR}/pstm_deadlock_test/test_runner.cc
+            DIR ${CMAKE_CURRENT_BINARY_DIR}
+            PROPERTIES
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+            BUNDLES
+            Celix::pubsub_serializer_json
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_websocket_v2
+            Celix::shell
+            Celix::shell_tui
+            pubsub_serializer
+            )
+    target_compile_definitions(pstm_deadlock_websocket_v2_test PRIVATE 
-DDEADLOCK_SUT_BUNDLE_FILE=\"${DEADLOCK_SUT_BUNDLE_FILE}\")
+    target_link_libraries(pstm_deadlock_websocket_v2_test PRIVATE 
Celix::pubsub_api GTest::gtest GTest::gtest_main Jansson Celix::dfi 
civetweb_shared)
+    target_include_directories(pstm_deadlock_websocket_v2_test SYSTEM PRIVATE 
pstm_deadlock_websocket_v2_test)
+
+    #Note we do not link to bundles, as result (to ensure a bundle zip file is 
created) an dependency on the bundle is needed.
+    add_dependencies(pstm_deadlock_websocket_v2_test 
pubsub_deadlock_sut_bundle)
+
+    #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is 
used.
+    configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor 
${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_test/META-INF/descriptors/msg.descriptor
 COPYONLY)
+    
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties 
${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_test/META-INF/topics/pub/deadlock.properties
 COPYONLY)
+
+    add_test(NAME pstm_deadlock_websocket_v2_test COMMAND 
pstm_deadlock_websocket_v2_test WORKING_DIRECTORY 
$<TARGET_PROPERTY:pstm_deadlock_websocket_v2_test,CONTAINER_LOC>)
+    setup_target_for_coverage(pstm_deadlock_websocket_v2_test SCAN_DIR ..)
 endif()
 
 if (BUILD_PUBSUB_PSA_ZMQ)

Reply via email to