This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new de7c3a1  Added pubsub admin websocket
     new 41d1ba9  Merge pull request #39 from 
dhbfischer/feature/pubsub_admin_websocket
de7c3a1 is described below

commit de7c3a19359a9908c56801dd081630400729f288
Author: dfischer <m...@daanfischer.nl>
AuthorDate: Tue Aug 6 17:26:56 2019 +0200

    Added pubsub admin websocket
---
 bundles/pubsub/CMakeLists.txt                      |   1 +
 .../pubsub/pubsub_admin_websocket/CMakeLists.txt   |  53 ++
 .../pubsub_admin_websocket/src/psa_activator.c     | 128 ++++
 .../src/pubsub_psa_websocket_constants.h           |  48 ++
 .../src/pubsub_websocket_admin.c                   | 630 +++++++++++++++++
 .../src/pubsub_websocket_admin.h                   |  54 ++
 .../src/pubsub_websocket_common.c                  |  73 ++
 .../src/pubsub_websocket_common.h                  |  55 ++
 .../src/pubsub_websocket_topic_receiver.c          | 787 +++++++++++++++++++++
 .../src/pubsub_websocket_topic_receiver.h          |  50 ++
 .../src/pubsub_websocket_topic_sender.c            | 477 +++++++++++++
 .../src/pubsub_websocket_topic_sender.h            |  48 ++
 bundles/pubsub/test/CMakeLists.txt                 |  21 +
 bundles/pubsub/test/meta_data/ping.properties      |   1 +
 14 files changed, 2426 insertions(+)

diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 0373347..a084aa5 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -34,6 +34,7 @@ if (PUBSUB)
     add_subdirectory(pubsub_admin_tcp)
     add_subdirectory(pubsub_admin_nanomsg)
     add_subdirectory(pubsub_admin_udp_mc)
+    add_subdirectory(pubsub_admin_websocket)
     add_subdirectory(keygen)
     add_subdirectory(mock)
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt 
b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
new file mode 100644
index 0000000..c992d5e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+find_package(UUID REQUIRED)
+
+if(NOT UUID_LIBRARY)
+    #i.e. not found for OSX
+    set(UUID_LIBRARY "")
+    set(UUID_INCLUDE_DIRS "")
+endif()
+
+add_celix_bundle(celix_pubsub_admin_websocket
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+    VERSION "1.0.0"
+    GROUP "Celix/PubSub"
+    SOURCES
+        src/psa_activator.c
+        src/pubsub_websocket_admin.c
+        src/pubsub_websocket_topic_sender.c
+        src/pubsub_websocket_topic_receiver.c
+        src/pubsub_websocket_common.c
+)
+
+set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH 
"$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket PRIVATE
+        Celix::pubsub_spi
+        Celix::framework Celix::dfi Celix::log_helper Celix::utils
+        Celix::http_admin_api
+)
+target_include_directories(celix_pubsub_admin_websocket PRIVATE
+    ${JANSSON_INCLUDE_DIR}
+    ${UUID_INCLUDE_DIRS}
+    src
+)
+
+install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT 
pubsub)
+target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c 
b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
new file mode 100644
index 0000000..a1d36b0
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+
+#include "celix_api.h"
+#include "pubsub_serializer.h"
+#include "log_helper.h"
+
+#include "pubsub_admin.h"
+#include "pubsub_admin_metrics.h"
+#include "pubsub_websocket_admin.h"
+#include "command.h"
+
+typedef struct psa_websocket_activator {
+    log_helper_t *logHelper;
+
+    pubsub_websocket_admin_t *admin;
+
+    long serializersTrackerId;
+
+    pubsub_admin_service_t adminService;
+    long adminSvcId;
+
+    pubsub_admin_metrics_service_t adminMetricsService;
+    long adminMetricsSvcId;
+
+    command_service_t cmdSvc;
+    long cmdSvcId;
+} psa_websocket_activator_t;
+
+int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t 
*ctx) {
+    act->adminSvcId = -1L;
+    act->cmdSvcId = -1L;
+    act->serializersTrackerId = -1L;
+
+    logHelper_create(ctx, &act->logHelper);
+    logHelper_start(act->logHelper);
+
+    act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
+    celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : 
CELIX_BUNDLE_EXCEPTION;
+
+    //track serializers (only json)
+    if (status == CELIX_SUCCESS) {
+        celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+        opts.filter.filter = "(pubsub.serializer=json)";
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = act->admin;
+        opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
+        opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
+        act->serializersTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    //register pubsub admin service
+    if (status == CELIX_SUCCESS) {
+        pubsub_admin_service_t *psaSvc = &act->adminService;
+        psaSvc->handle = act->admin;
+        psaSvc->matchPublisher = pubsub_websocketAdmin_matchPublisher;
+        psaSvc->matchSubscriber = pubsub_websocketAdmin_matchSubscriber;
+        psaSvc->matchDiscoveredEndpoint = 
pubsub_websocketAdmin_matchDiscoveredEndpoint;
+        psaSvc->setupTopicSender = pubsub_websocketAdmin_setupTopicSender;
+        psaSvc->teardownTopicSender = 
pubsub_websocketAdmin_teardownTopicSender;
+        psaSvc->setupTopicReceiver = pubsub_websocketAdmin_setupTopicReceiver;
+        psaSvc->teardownTopicReceiver = 
pubsub_websocketAdmin_teardownTopicReceiver;
+        psaSvc->addDiscoveredEndpoint = 
pubsub_websocketAdmin_addDiscoveredEndpoint;
+        psaSvc->removeDiscoveredEndpoint = 
pubsub_websocketAdmin_removeDiscoveredEndpoint;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, 
PUBSUB_WEBSOCKET_ADMIN_TYPE);
+
+        act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, 
PUBSUB_ADMIN_SERVICE_NAME, props);
+    }
+
+    if (status == CELIX_SUCCESS) {
+        act->adminMetricsService.handle = act->admin;
+        act->adminMetricsService.metrics = pubsub_websocketAdmin_metrics;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, 
PUBSUB_WEBSOCKET_ADMIN_TYPE);
+
+        act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, 
&act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
+    }
+
+    //register shell command service
+    {
+        act->cmdSvc.handle = act->admin;
+        act->cmdSvc.executeCommand = pubsub_websocketAdmin_executeCommand;
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_websocket");
+        celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_websocket");
+        celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the 
information about the TopicSender and TopicReceivers for the websocket PSA");
+        act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, 
OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+    }
+
+    return status;
+}
+
+int psa_websocket_stop(psa_websocket_activator_t *act, celix_bundle_context_t 
*ctx) {
+    celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+    celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+    celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
+    celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+    pubsub_websocketAdmin_destroy(act->admin);
+
+    logHelper_stop(act->logHelper);
+    logHelper_destroy(&act->logHelper);
+
+    return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(psa_websocket_activator_t, psa_websocket_start, 
psa_websocket_stop);
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
new file mode 100644
index 0000000..c6fba82
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_
+#define PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_
+
+#define PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE        30
+#define PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE       70
+#define PSA_WEBSOCKET_DEFAULT_SCORE                   30
+
+#define PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY            
"PSA_WEBSOCKET_QOS_SAMPLE_SCORE"
+#define PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY           
"PSA_WEBSOCKET_QOS_CONTROL_SCORE"
+#define PSA_WEBSOCKET_DEFAULT_SCORE_KEY               
"PSA_WEBSOCKET_DEFAULT_SCORE"
+
+
+#define PSA_WEBSOCKET_METRICS_ENABLED                 
"PSA_WEBSOCKET_METRICS_ENABLED"
+#define PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED         true
+
+#define PUBSUB_WEBSOCKET_VERBOSE_KEY                  "PSA_WEBSOCKET_VERBOSE"
+#define PUBSUB_WEBSOCKET_VERBOSE_DEFAULT              true
+
+#define PUBSUB_WEBSOCKET_ADMIN_TYPE                   "websocket"
+#define PUBSUB_WEBSOCKET_ADDRESS_KEY                  
"websocket.socket_address"
+#define PUBSUB_WEBSOCKET_PORT_KEY                     "websocket.socket_port"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES    
"websocket.static.connect.socket_addresses"
+
+#endif /* PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
new file mode 100644
index 0000000..a7fbe29
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory.h>
+#include <pubsub_endpoint.h>
+#include <pubsub_serializer.h>
+#include <ip_utils.h>
+
+#include "pubsub_utils.h"
+#include "pubsub_websocket_admin.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_topic_sender.h"
+#include "pubsub_websocket_topic_receiver.h"
+#include "pubsub_websocket_common.h"
+
+#define L_DEBUG(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_websocket_admin {
+    celix_bundle_context_t *ctx;
+    log_helper_t *log;
+    const char *fwUUID;
+
+    double qosSampleScore;
+    double qosControlScore;
+    double defaultScore;
+
+    bool verbose;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = svcId, value = 
psa_websocket_serializer_entry_t*
+    } serializers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = 
pubsub_websocket_topic_sender_t*
+    } topicSenders;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = scope:topic key, value = 
pubsub_websocket_topic_sender_t*
+    } topicReceivers;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* 
(endpoint)
+    } discoveredEndpoints;
+
+};
+
+typedef struct psa_websocket_serializer_entry {
+    const char *serType;
+    long svcId;
+    pubsub_serializer_service_t *svc;
+} psa_websocket_serializer_entry_t;
+
+static celix_status_t 
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, 
pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint);
+static celix_status_t 
pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* 
psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint);
+
+
+pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t 
*ctx, log_helper_t *logHelper) {
+    pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
+    psa->ctx = ctx;
+    psa->log = logHelper;
+    psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, 
PUBSUB_WEBSOCKET_VERBOSE_KEY, PUBSUB_WEBSOCKET_VERBOSE_DEFAULT);
+    psa->fwUUID = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+
+    psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_DEFAULT_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_SCORE);
+    psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
+    psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
+
+    celixThreadMutex_create(&psa->serializers.mutex, NULL);
+    psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+    celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
+    psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->topicReceivers.mutex, NULL);
+    psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+
+    celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
+    psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+
+    return psa;
+}
+
+void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
+    if (psa == NULL) {
+        return;
+    }
+
+    //note assuming al psa register services and service tracker are removed.
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_sender_t *sender = 
hashMapIterator_nextValue(&iter);
+        pubsub_websocketTopicSender_destroy(sender);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_receiver_t *recv = 
hashMapIterator_nextValue(&iter);
+        pubsub_websocketTopicReceiver_destroy(recv);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    iter = hashMapIterator_construct(psa->discoveredEndpoints.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_properties_t *ep = hashMapIterator_nextValue(&iter);
+        celix_properties_destroy(ep);
+    }
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    iter = hashMapIterator_construct(psa->serializers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_serializer_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    celixThreadMutex_destroy(&psa->topicSenders.mutex);
+    hashMap_destroy(psa->topicSenders.map, true, false);
+
+    celixThreadMutex_destroy(&psa->topicReceivers.mutex);
+    hashMap_destroy(psa->topicReceivers.map, true, false);
+
+    celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
+    hashMap_destroy(psa->discoveredEndpoints.map, false, false);
+
+    celixThreadMutex_destroy(&psa->serializers.mutex);
+    hashMap_destroy(psa->serializers.map, false, false);
+
+    free(psa);
+}
+
+void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    const char *serType = celix_properties_get(props, 
PUBSUB_SERIALIZER_TYPE_KEY, NULL);
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
+
+    if (serType == NULL) {
+        L_INFO("[PSA_WEBSOCKET] Ignoring serializer service without %s 
property", PUBSUB_SERIALIZER_TYPE_KEY);
+        return;
+    }
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_websocket_serializer_entry_t *entry = 
hashMap_get(psa->serializers.map, (void*)svcId);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->serType = serType;
+        entry->svcId = svcId;
+        entry->svc = svc;
+        hashMap_put(psa->serializers.map, (void*)svcId, entry);
+    }
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+}
+
+void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props) {
+    pubsub_websocket_admin_t *psa = handle;
+    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
+
+    //remove serializer
+    // 1) First find entry and
+    // 2) loop and destroy all topic sender using the serializer and
+    // 3) loop and destroy all topic receivers using the serializer
+    // Note that it is the responsibility of the topology manager to create 
new topic senders/receivers
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    psa_websocket_serializer_entry_t *entry = 
hashMap_remove(psa->serializers.map, (void*)svcId);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (entry != NULL) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_websocket_topic_sender_t *sender = 
hashMapEntry_getValue(senderEntry);
+            if (sender != NULL && entry->svcId == 
pubsub_websocketTopicSender_serializerSvcId(sender)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_websocketTopicSender_destroy(sender);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        iter = hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+            pubsub_websocket_topic_receiver_t *receiver = 
hashMapEntry_getValue(senderEntry);
+            if (receiver != NULL && entry->svcId == 
pubsub_websocketTopicReceiver_serializerSvcId(receiver)) {
+                char *key = hashMapEntry_getKey(senderEntry);
+                hashMapIterator_remove(&iter);
+                pubsub_websocketTopicReceiver_destroy(receiver);
+                free(key);
+            }
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+        free(entry);
+    }
+}
+
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t 
**topicProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_websocket_admin_t *psa = handle;
+    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, 
svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+                                               psa->qosSampleScore, 
psa->qosControlScore, psa->defaultScore,
+                                               topicProperties, 
outSerializerSvcId);
+    *outScore = score;
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long 
svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t 
**topicProperties, double *outScore, long *outSerializerSvcId) {
+    pubsub_websocket_admin_t *psa = handle;
+    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
+    celix_status_t  status = CELIX_SUCCESS;
+    double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, 
svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+                                                psa->qosSampleScore, 
psa->qosControlScore, psa->defaultScore,
+                                                topicProperties, 
outSerializerSvcId);
+    if (outScore != NULL) {
+        *outScore = score;
+    }
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint, bool *outMatch) {
+    pubsub_websocket_admin_t *psa = handle;
+    L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
+    celix_status_t  status = CELIX_SUCCESS;
+    bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, 
PUBSUB_WEBSOCKET_ADMIN_TYPE, NULL);
+    if (outMatch != NULL) {
+        *outMatch = match;
+    }
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char 
*scope, const char *topic, const celix_properties_t *topicProperties, long 
serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Create TopicSender
+    //2) Store TopicSender
+    //3) Connect existing endpoints
+    //4) set outPublisherEndpoint
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    pubsub_websocket_topic_sender_t *sender = 
hashMap_get(psa->topicSenders.map, key);
+    if (sender == NULL) {
+        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, 
scope, topic, serializerSvcId, serEntry->svc);
+        }
+        if (sender != NULL) {
+            const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, 
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+                                                serType, NULL);
+
+            //Set endpoint visibility to local because the http server handles 
discovery
+            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, 
PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, 
"CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicSenders.map, key, sender);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Error creating a TopicSender");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicSender for 
scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
+        *outPublisherEndpoint = newEndpoint;
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const 
char *scope, const char *topic) {
+    pubsub_websocket_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    //1) Find and remove TopicSender from map
+    //2) destroy topic sender
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key);
+    if (entry != NULL) {
+        char *mapKey = hashMapEntry_getKey(entry);
+        pubsub_websocket_topic_sender_t *sender = 
hashMap_remove(psa->topicSenders.map, key);
+        free(mapKey);
+        pubsub_websocketTopicSender_destroy(sender);
+    } else {
+        L_ERROR("[PSA_WEBSOCKET] Cannot teardown TopicSender with scope/topic 
%s/%s. Does not exists", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    free(key);
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const 
char *scope, const char *topic, const celix_properties_t *topicProperties, long 
serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    celix_properties_t *newEndpoint = NULL;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    pubsub_websocket_topic_receiver_t *receiver = 
hashMap_get(psa->topicReceivers.map, key);
+    if (receiver == NULL) {
+        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serializerSvcId);
+        if (serEntry != NULL) {
+            receiver = pubsub_websocketTopicReceiver_create(psa->ctx, 
psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Cannot find serializer for TopicSender 
%s/%s", scope, topic);
+        }
+        if (receiver != NULL) {
+            const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
+            const char *serType = serEntry->serType;
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
+                                                
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL);
+
+            //Set endpoint visibility to local because the http server handles 
discovery
+            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, 
PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+
+            //if available also set container name
+            const char *cn = celix_bundleContext_getProperty(psa->ctx, 
"CELIX_CONTAINER_NAME", NULL);
+            if (cn != NULL) {
+                celix_properties_set(newEndpoint, "container_name", cn);
+            }
+            hashMap_put(psa->topicReceivers.map, key, receiver);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Error creating a TopicReceiver.");
+            free(key);
+        }
+    } else {
+        free(key);
+        L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicReceiver 
for scope/topic %s/%s!", scope, topic);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    if (receiver != NULL && newEndpoint != NULL) {
+        celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(psa->discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            celix_properties_t *endpoint = hashMapIterator_nextValue(&iter);
+            const char *type = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TYPE, NULL);
+            if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+                pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, 
endpoint);
+            }
+        }
+        celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+    }
+
+    if (newEndpoint != NULL && outSubscriberEndpoint != NULL) {
+        *outSubscriberEndpoint = newEndpoint;
+    }
+
+    celix_status_t status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const 
char *scope, const char *topic) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key);
+    free(key);
+    if (entry != NULL) {
+        char *receiverKey = hashMapEntry_getKey(entry);
+        pubsub_websocket_topic_receiver_t *receiver = 
hashMapEntry_getValue(entry);
+        hashMap_remove(psa->topicReceivers.map, receiverKey);
+
+        free(receiverKey);
+        pubsub_websocketTopicReceiver_destroy(receiver);
+    }
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+static celix_status_t 
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, 
pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+    const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, 
NULL);
+    const char *eScope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *sockAddress = celix_properties_get(endpoint, 
PUBSUB_WEBSOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, 
PUBSUB_WEBSOCKET_PORT_KEY, -1L);
+
+    bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, 
type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+    if (publisher && (sockAddress == NULL || sockPort < 0)) {
+        L_WARN("[PSA WEBSOCKET] Error got endpoint without websocket 
address/port or endpoint type. Properties:");
+        const char *key = NULL;
+        CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+            L_WARN("[PSA WEBSOCKET] |- %s=%s\n", key, 
celix_properties_get(endpoint, key, NULL));
+        }
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 &&
+            strncmp(eTopic, topic, 1024 * 1024) == 0) {
+            char *uri = psa_websocket_createURI(eScope, eTopic);
+            pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress, 
sockPort, uri);
+        }
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const 
celix_properties_t *endpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, 
NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_websocket_topic_receiver_t *receiver = 
hashMapIterator_nextValue(&iter);
+            pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, 
endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    celix_properties_t *cpy = celix_properties_copy(endpoint);
+    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL);
+    hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+
+static celix_status_t 
pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* 
psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint) {
+    //note can be called with discoveredEndpoint.mutex lock
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+    const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+
+    if (eScope != NULL && eTopic != NULL &&
+            strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 
1024 * 1024) == 0) {
+        char *uri = psa_websocket_createURI(eScope, eTopic);
+        pubsub_websocketTopicReceiver_disconnectFrom(receiver, uri);
+
+    }
+
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint) {
+    pubsub_websocket_admin_t *psa = handle;
+
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, 
NULL);
+
+    if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_websocket_topic_receiver_t *receiver = 
hashMapIterator_nextValue(&iter);
+            pubsub_websocketAdmin_disconnectEndpointFromReceiver(psa, 
receiver, endpoint);
+        }
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+
+    celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, 
NULL);
+    celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, 
(void*)uuid);
+    celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
+
+    if (found != NULL) {
+        celix_properties_destroy(found);
+    }
+
+    celix_status_t  status = CELIX_SUCCESS;
+    return status;
+}
+
+celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char 
*commandLine __attribute__((unused)), FILE *out, FILE *errStream 
__attribute__((unused))) {
+    pubsub_websocket_admin_t *psa = handle;
+    celix_status_t  status = CELIX_SUCCESS;
+
+    fprintf(out, "\n");
+    fprintf(out, "Topic Senders:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_sender_t *sender = 
hashMapIterator_nextValue(&iter);
+        long serSvcId = pubsub_websocketTopicSender_serializerSvcId(sender);
+        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_websocketTopicSender_scope(sender);
+        const char *topic = pubsub_websocketTopicSender_topic(sender);
+        const char *url = pubsub_websocketTopicSender_url(sender);
+//        const char *postUrl = pubsub_websocketTopicSender_isStatic(sender) ? 
" (static)" : "";
+        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- url            = %s\n", url);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+
+    fprintf(out, "\n");
+    fprintf(out, "\nTopic Receivers:\n");
+    celixThreadMutex_lock(&psa->serializers.mutex);
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_receiver_t *receiver = 
hashMapIterator_nextValue(&iter);
+        long serSvcId = 
pubsub_websocketTopicReceiver_serializerSvcId(receiver);
+        psa_websocket_serializer_entry_t *serEntry = 
hashMap_get(psa->serializers.map, (void*)serSvcId);
+        const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+        const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
+        const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+
+        celix_array_list_t *connected = celix_arrayList_create();
+        celix_array_list_t *unconnected = celix_arrayList_create();
+        pubsub_websocketTopicReceiver_listConnections(receiver, connected, 
unconnected);
+
+        fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+        fprintf(out, "   |- serializer type = %s\n", serType);
+        for (int i = 0; i < celix_arrayList_size(connected); ++i) {
+            char *url = celix_arrayList_get(connected, i);
+            fprintf(out, "   |- connected url   = %s\n", url);
+            free(url);
+        }
+        for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
+            char *url = celix_arrayList_get(unconnected, i);
+            fprintf(out, "   |- unconnected url = %s\n", url);
+            free(url);
+        }
+        celix_arrayList_destroy(connected);
+        celix_arrayList_destroy(unconnected);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    celixThreadMutex_unlock(&psa->serializers.mutex);
+    fprintf(out, "\n");
+
+    return status;
+}
+
+pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle) {
+    pubsub_websocket_admin_t *psa = handle;
+    pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
PUBSUB_WEBSOCKET_ADMIN_TYPE);
+    result->senders = celix_arrayList_create();
+    result->receivers = celix_arrayList_create();
+
+    celixThreadMutex_lock(&psa->topicSenders.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_sender_t *sender = 
hashMapIterator_nextValue(&iter);
+        pubsub_admin_sender_metrics_t *metrics = 
pubsub_websocketTopicSender_metrics(sender);
+        celix_arrayList_add(result->senders, metrics);
+    }
+    celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+    celixThreadMutex_lock(&psa->topicReceivers.mutex);
+    iter = hashMapIterator_construct(psa->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_websocket_topic_receiver_t *receiver = 
hashMapIterator_nextValue(&iter);
+        pubsub_admin_receiver_metrics_t *metrics = 
pubsub_websocketTopicReceiver_metrics(receiver);
+        celix_arrayList_add(result->receivers, metrics);
+    }
+    celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+
+    return result;
+}
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
new file mode 100644
index 0000000..62a14a9
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+#define CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_api.h"
+#include "log_helper.h"
+#include "pubsub_psa_websocket_constants.h"
+
+typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
+
+pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t 
*ctx, log_helper_t *logHelper);
+void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
+
+celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t 
**topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long 
svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t 
**topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint, bool *match);
+
+celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char 
*scope, const char *topic, const celix_properties_t* topicProperties, long 
serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const 
char *scope, const char *topic);
+
+celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const 
char *scope, const char *topic, const celix_properties_t* topicProperties, long 
serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const 
char *scope, const char *topic);
+
+celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const 
celix_properties_t *endpoint);
+celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint);
+
+void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props);
+void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props);
+
+celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char 
*commandLine, FILE *outStream, FILE *errStream);
+
+pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
new file mode 100644
index 0000000..ce85e40
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory.h>
+#include <assert.h>
+#include <stdio.h>
+#include "pubsub_websocket_common.h"
+
+int psa_websocket_localMsgTypeIdForMsgType(void* handle 
__attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
+    *msgTypeId = utils_stringHash(msgType);
+    return 0;
+}
+
+bool psa_websocket_checkVersion(version_pt msgVersion, const 
pubsub_websocket_msg_header_t *hdr) {
+    bool check=false;
+    int major=0,minor=0;
+
+    if (hdr->major == 0 && hdr->minor == 0) {
+        //no check
+        return true;
+    }
+
+    if (msgVersion!=NULL) {
+        version_getMajor(msgVersion,&major);
+        version_getMinor(msgVersion,&minor);
+        if (hdr->major==((unsigned char)major)) { /* Different major means 
incompatible */
+            check = (hdr->minor>=((unsigned char)minor)); /* Compatible only 
if the provider has a minor equals or greater (means compatible update) */
+        }
+    }
+
+    return check;
+}
+
+void psa_websocket_setScopeAndTopicFilter(const char* scope, const char 
*topic, char *filter) {
+    for (int i = 0; i < 5; ++i) {
+        filter[i] = '\0';
+    }
+    if (scope != NULL && strnlen(scope, 3) >= 2)  {
+        filter[0] = scope[0];
+        filter[1] = scope[1];
+    }
+    if (topic != NULL && strnlen(topic, 3) >= 2)  {
+        filter[2] = topic[0];
+        filter[3] = topic[1];
+    }
+}
+
+char *psa_websocket_createURI(const char *scope, const char *topic) {
+    char *uri = NULL;
+    if(scope != NULL && topic != NULL) {
+        asprintf(&uri, "/pubsub/%s/%s", scope, topic);
+    }
+    else if(scope == NULL && topic != NULL) {
+        asprintf(&uri, "/pubsub/default/%s", topic);
+    }
+    return uri;
+}
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
new file mode 100644
index 0000000..89ec65a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_COMMON_H
+#define CELIX_PUBSUB_WEBSOCKET_COMMON_H
+
+#include <utils.h>
+#include <stdint.h>
+
+#include "version.h"
+
+
+struct pubsub_websocket_msg_header {
+    uint32_t type; //msg type id (hash of fqn)
+    uint8_t major;
+    uint8_t minor;
+    uint32_t seqNr;
+    unsigned char originUUID[16];
+    uint64_t sendtimeSeconds; //seconds since epoch
+    uint64_t sendTimeNanoseconds; //ns since epoch
+};
+
+typedef struct pubsub_websocket_msg_header pubsub_websocket_msg_header_t;
+
+struct pubsub_websocket_msg {
+    pubsub_websocket_msg_header_t header;
+    unsigned int payloadSize;
+    char payload[];
+};
+
+typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
+
+int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
+void psa_websocket_setScopeAndTopicFilter(const char* scope, const char 
*topic, char *filter);
+char *psa_websocket_createURI(const char *scope, const char *topic);
+
+bool psa_websocket_checkVersion(version_pt msgVersion, const 
pubsub_websocket_msg_header_t *hdr);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_COMMON_H
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
new file mode 100644
index 0000000..4a41a0e
--- /dev/null
+++ 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -0,0 +1,787 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <sys/epoll.h>
+#include <assert.h>
+#include <pubsub_endpoint.h>
+#include <arpa/inet.h>
+#include <log_helper.h>
+#include <math.h>
+#include "pubsub_websocket_topic_receiver.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_common.h"
+
+#include <uuid/uuid.h>
+#include <pubsub_admin_metrics.h>
+#include <http_admin/api.h>
+
+#ifndef UUID_STR_LEN
+#define UUID_STR_LEN 37
+#endif
+
+
+#define L_DEBUG(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+typedef struct pubsub_websocket_rcv_buffer {
+    celix_thread_mutex_t mutex;
+    celix_array_list_t *list;     //List of received websocket messages (type: 
pubsub_websocket_msg_t *)
+    celix_array_list_t *rcvTimes; //Corresponding receive times of the 
received websocket messages (rcvTimes[i] -> list[i])
+} pubsub_websocket_rcv_buffer_t;
+
+struct pubsub_websocket_topic_receiver {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    char *uri;
+    bool metricsEnabled;
+
+    pubsub_websocket_rcv_buffer_t recvBuffer;
+
+    struct {
+        celix_thread_t thread;
+        celix_thread_mutex_t mutex;
+        bool running;
+    } recvThread;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = url (host:port), value = 
psa_websocket_requested_connection_entry_t*
+        bool allConnected; //true if all requestedConnectection are connected
+    } requestedConnections;
+
+    long subscriberTrackerId;
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = bnd id, value = 
psa_websocket_subscriber_entry_t
+        bool allInitialized;
+    } subscribers;
+};
+
+typedef struct psa_websocket_requested_connection_entry {
+    pubsub_websocket_rcv_buffer_t *recvBuffer;
+    char *key; //host:port
+    char *socketAddress;
+    long socketPort;
+    char *uri;
+    struct mg_connection *sockConnection;
+    int connectRetryCount;
+    bool connected;
+    bool statically; //true if the connection is statically configured through 
the topic properties.
+} psa_websocket_requested_connection_entry_t;
+
+typedef struct psa_websocket_subscriber_metrics_entry_t {
+    unsigned int msgTypeId;
+    uuid_t origin;
+
+    unsigned long nrOfMessagesReceived;
+    unsigned long nrOfSerializationErrors;
+    struct timespec lastMessageReceived;
+    double averageTimeBetweenMessagesInSeconds;
+    double averageSerializationTimeInSeconds;
+    double averageDelayInSeconds;
+    double maxDelayInSeconds;
+    double minDelayInSeconds;
+    unsigned int lastSeqNr;
+    unsigned long nrOfMissingSeqNumbers;
+} psa_websocket_subscriber_metrics_entry_t;
+
+typedef struct psa_websocket_subscriber_entry {
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin 
uuid, value = psa_websocket_subscriber_metrics_entry_t*
+    pubsub_subscriber_t *svc;
+    bool initialized; //true if the init function is called through the 
receive thread
+} psa_websocket_subscriber_entry_t;
+
+
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void 
*svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void 
*svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void* psa_websocket_recvThread(void * data);
+static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
 *receiver);
+static void 
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t 
*receiver);
+
+static int psa_websocketTopicReceiver_data(struct mg_connection *connection, 
int op_code, char *data, size_t length, void *handle);
+static void psa_websocketTopicReceiver_close(const struct mg_connection 
*connection, void *handle);
+
+
+pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx,
+                                                              log_helper_t 
*logHelper,
+                                                              const char 
*scope,
+                                                              const char 
*topic,
+                                                              const 
celix_properties_t *topicProperties,
+                                                              long 
serializerSvcId,
+                                                              
pubsub_serializer_service_t *serializer) {
+    pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+    receiver->ctx = ctx;
+    receiver->logHelper = logHelper;
+    receiver->serializerSvcId = serializerSvcId;
+    receiver->serializer = serializer;
+    receiver->scope = strndup(scope, 1024 * 1024);
+    receiver->topic = strndup(topic, 1024 * 1024);
+    psa_websocket_setScopeAndTopicFilter(scope, topic, 
receiver->scopeAndTopicFilter);
+    receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
+
+    receiver->uri = psa_websocket_createURI(scope, topic);
+
+    if (receiver->uri != NULL) {
+        celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+        celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+        celixThreadMutex_create(&receiver->recvThread.mutex, NULL);
+        celixThreadMutex_create(&receiver->recvBuffer.mutex, NULL);
+
+        receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+        receiver->requestedConnections.map = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
+        arrayList_create(&receiver->recvBuffer.list);
+        arrayList_create(&receiver->recvBuffer.rcvTimes);
+    }
+
+    //track subscribers
+    if (receiver->uri != NULL) {
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
topic);
+        char buf[size+1];
+        snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
topic);
+        celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+        opts.filter.filter = buf;
+        opts.callbackHandle = receiver;
+        opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber;
+        opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber;
+
+        receiver->subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+
+    const char *staticConnects = celix_properties_get(topicProperties, 
PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
+    if (staticConnects != NULL) {
+        char *copy = strndup(staticConnects, 1024*1024);
+        char* addr;
+        char* save = copy;
+
+        while ((addr = strtok_r(save, " ", &save))) {
+            char *colon = strchr(addr, ':');
+            if (colon == NULL) {
+                continue;
+            }
+
+            char *sockAddr = NULL;
+            asprintf(&sockAddr, "%.*s", (int)(colon - addr), addr);
+
+            long sockPort = atol((colon + 1));
+
+            char *key = NULL;
+            asprintf(&key, "%s:%li", sockAddr, sockPort);
+
+
+            if (sockPort > 0) {
+                psa_websocket_requested_connection_entry_t *entry = calloc(1, 
sizeof(*entry));
+                entry->key = key;
+                entry->uri = strndup(receiver->uri, 1024 * 1024);
+                entry->socketAddress = sockAddr;
+                entry->socketPort = sockPort;
+                entry->connected = false;
+                entry->statically = true;
+                entry->recvBuffer = &receiver->recvBuffer;
+                hashMap_put(receiver->requestedConnections.map, (void *) 
entry->key, entry);
+            } else {
+                L_WARN("[PSA_WEBSOCKET_TR] Invalid static socket address %s", 
addr);
+                free(key);
+                free(sockAddr);
+            }
+        }
+        free(copy);
+    }
+
+
+    if (receiver->uri != NULL) {
+        receiver->recvThread.running = true;
+        celixThread_create(&receiver->recvThread.thread, NULL, 
psa_websocket_recvThread, receiver);
+        char name[64];
+        snprintf(name, 64, "WEBSOCKET TR %s/%s", scope, topic);
+        celixThread_setName(&receiver->recvThread.thread, name);
+    }
+
+    if (receiver->uri == NULL) {
+        free(receiver->scope);
+        free(receiver->topic);
+        free(receiver);
+        receiver = NULL;
+        L_ERROR("[PSA_WEBSOCKET] Cannot create TopicReceiver for %s/%s", 
scope, topic);
+    }
+
+    return receiver;
+}
+
+void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t 
*receiver) {
+    if (receiver != NULL) {
+
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        receiver->recvThread.running = false;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+        celixThread_join(receiver->recvThread.thread, NULL);
+
+        celix_bundleContext_stopTracker(receiver->ctx, 
receiver->subscriberTrackerId);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (entry != NULL)  {
+                hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->metrics);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+                    hashMap_destroy(origins, true, true);
+                }
+                hashMap_destroy(entry->metrics, false, false);
+
+                
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
+                free(entry);
+            }
+
+        }
+        hashMap_destroy(receiver->subscribers.map, false, false);
+
+
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                if(entry->connected) {
+                    mg_close_connection(entry->sockConnection);
+                }
+                free(entry->uri);
+                free(entry->socketAddress);
+                free(entry->key);
+                free(entry);
+            }
+        }
+        hashMap_destroy(receiver->requestedConnections.map, false, false);
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_destroy(&receiver->subscribers.mutex);
+        celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+        celixThreadMutex_destroy(&receiver->recvThread.mutex);
+
+        celixThreadMutex_destroy(&receiver->recvBuffer.mutex);
+        int msgBufSize = celix_arrayList_size(receiver->recvBuffer.list);
+        while(msgBufSize > 0) {
+            pubsub_websocket_msg_t *msg = 
celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+            free(msg);
+            msgBufSize--;
+        }
+        celix_arrayList_destroy(receiver->recvBuffer.list);
+
+        int rcvTimesSize = celix_arrayList_size(receiver->recvBuffer.rcvTimes);
+        while(rcvTimesSize > 0) {
+            struct timespec *time = 
celix_arrayList_get(receiver->recvBuffer.rcvTimes, rcvTimesSize - 1);
+            free(time);
+            rcvTimesSize--;
+        }
+        celix_arrayList_destroy(receiver->recvBuffer.rcvTimes);
+
+        free(receiver->uri);
+        free(receiver->scope);
+        free(receiver->topic);
+    }
+    free(receiver);
+}
+
+const char* 
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t 
*receiver) {
+    return receiver->scope;
+}
+const char* 
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t 
*receiver) {
+    return receiver->topic;
+}
+
+long 
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t 
*receiver) {
+    return receiver->serializerSvcId;
+}
+
+void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t 
*unconnectedUrls) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        char *url = NULL;
+        asprintf(&url, "%s%s", entry->uri, entry->statically ? " (static)" : 
"");
+        if (entry->connected) {
+            celix_arrayList_add(connectedUrls, url);
+        } else {
+            celix_arrayList_add(unconnectedUrls, url);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort, const char *uri) {
+    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s connecting to websocket uri 
%s", receiver->scope, receiver->topic, uri);
+
+    char *key = NULL;
+    asprintf(&key, "%s:%li", socketAddress, socketPort);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_websocket_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, key);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->key = key;
+        entry->uri = strndup(uri, 1024 * 1024);
+        entry->socketAddress = strndup(socketAddress, 1024 * 1024);
+        entry->socketPort = socketPort;
+        entry->connected = false;
+        entry->statically = false;
+        entry->recvBuffer = &receiver->recvBuffer;
+        hashMap_put(receiver->requestedConnections.map, (void*)entry->uri, 
entry);
+        receiver->requestedConnections.allConnected = false;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    psa_websocket_connectToAllRequestedConnections(receiver);
+}
+
+void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t 
*receiver, const char *uri) {
+    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s disconnect from websocket uri 
%s", receiver->scope, receiver->topic, uri);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_websocket_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, uri);
+    if (entry != NULL && entry->connected) {
+        mg_close_connection(entry->sockConnection);
+        L_WARN("[PSA_WEBSOCKET] Error disconnecting from websocket uri %s.", 
uri);
+    }
+    if (entry != NULL) {
+        free(entry->socketAddress);
+        free(entry->uri);
+        free(entry->key);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void 
*svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
+    pubsub_websocket_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+    const char *subScope = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+        //not the same scope. ignore
+        return;
+    }
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_websocket_subscriber_entry_t *entry = 
hashMap_get(receiver->subscribers.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->usageCount += 1;
+    } else {
+        //new create entry
+        entry = calloc(1, sizeof(*entry));
+        entry->usageCount = 1;
+        entry->svc = svc;
+        entry->initialized = false;
+
+        int rc = 
receiver->serializer->createSerializerMap(receiver->serializer->handle, 
(celix_bundle_t*)bnd, &entry->msgTypes);
+
+        if (rc == 0) {
+            entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
+            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                pubsub_msg_serializer_t *msgSer = 
hashMapIterator_nextValue(&iter);
+                hash_map_t *origins = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+                hashMap_put(entry->metrics, (void*)(uintptr_t)msgSer->msgId, 
origins);
+            }
+        }
+
+        if (rc == 0) {
+            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for 
TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            free(entry);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void 
*svc __attribute__((unused)), const celix_properties_t *props 
__attribute__((unused)), const celix_bundle_t *bnd) {
+    pubsub_websocket_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_websocket_subscriber_entry_t *entry = 
hashMap_get(receiver->subscribers.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->usageCount -= 1;
+    }
+    if (entry != NULL && entry->usageCount <= 0) {
+        //remove entry
+        hashMap_remove(receiver->subscribers.map, (void*)bndId);
+        int rc = 
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for 
TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        }
+        hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_t *origins = hashMapIterator_nextValue(&iter);
+            hashMap_destroy(origins, true, true);
+        }
+        hashMap_destroy(entry->metrics, false, false);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static inline void 
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, 
psa_websocket_subscriber_entry_t* entry, const pubsub_websocket_msg_header_t 
*hdr, const void* payload, size_t payloadSize, struct timespec *receiveTime) {
+    //NOTE receiver->subscribers.mutex locked
+    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, 
(void*)(uintptr_t)(hdr->type));
+    pubsub_subscriber_t *svc = entry->svc;
+    bool monitor = receiver->metricsEnabled;
+
+    //monitoring
+    struct timespec beginSer;
+    struct timespec endSer;
+    int updateReceiveCount = 0;
+    int updateSerError = 0;
+
+    if (msgSer!= NULL) {
+        void *deserializedMsg = NULL;
+        bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion, 
hdr);
+        if (validVersion) {
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &beginSer);
+            }
+            celix_status_t status = msgSer->deserialize(msgSer->handle, 
payload, payloadSize, &deserializedMsg);
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &endSer);
+            }
+            if (status == CELIX_SUCCESS) {
+                bool release = true;
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deserializedMsg, &release);
+                if (release) {
+                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
+                }
+                updateReceiveCount += 1;
+            } else {
+                updateSerError += 1;
+                L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
+            }
+        }
+    } else {
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X", 
hdr->type);
+    }
+
+    if (msgSer != NULL && monitor) {
+        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t 
)hdr->type);
+        char uuidStr[UUID_STR_LEN+1];
+        uuid_unparse(hdr->originUUID, uuidStr);
+        psa_websocket_subscriber_metrics_entry_t *metrics = 
hashMap_get(origins, uuidStr);
+
+        if (metrics == NULL) {
+            metrics = calloc(1, sizeof(*metrics));
+            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
+            uuid_copy(metrics->origin, hdr->originUUID);
+            metrics->msgTypeId = hdr->type;
+            metrics->maxDelayInSeconds = -INFINITY;
+            metrics->minDelayInSeconds = INFINITY;
+            metrics->lastSeqNr = 0;
+        }
+
+        double diff = celix_difftime(&beginSer, &endSer);
+        long n = metrics->nrOfMessagesReceived;
+        metrics->averageSerializationTimeInSeconds = 
(metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
+
+        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+        n = metrics->nrOfMessagesReceived;
+        if (metrics->nrOfMessagesReceived >= 1) {
+            metrics->averageTimeBetweenMessagesInSeconds = 
(metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
+        }
+        metrics->lastMessageReceived = *receiveTime;
+
+
+        int incr = hdr->seqNr - metrics->lastSeqNr;
+        if (metrics->lastSeqNr >0 && incr > 1) {
+            metrics->nrOfMissingSeqNumbers += (incr - 1);
+            L_WARN("Missing message seq nr went from %i to %i", 
metrics->lastSeqNr, hdr->seqNr);
+        }
+        metrics->lastSeqNr = hdr->seqNr;
+
+        struct timespec sendTime;
+        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
+        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the 
tv_nsec is not correct
+        diff = celix_difftime(&sendTime, receiveTime);
+        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + 
diff) / (n+1);
+        if (diff < metrics->minDelayInSeconds) {
+            metrics->minDelayInSeconds = diff;
+        }
+        if (diff > metrics->maxDelayInSeconds) {
+            metrics->maxDelayInSeconds = diff;
+        }
+
+        metrics->nrOfMessagesReceived += updateReceiveCount;
+        metrics->nrOfSerializationErrors += updateSerError;
+    }
+}
+
+static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, 
const pubsub_websocket_msg_header_t *hdr, const char *payload, size_t 
payloadSize, struct timespec *receiveTime) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        if (entry != NULL) {
+            processMsgForSubscriberEntry(receiver, entry, hdr, payload, 
payloadSize, receiveTime);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
+
+static void* psa_websocket_recvThread(void * data) {
+    pubsub_websocket_topic_receiver_t *receiver = data;
+
+    celixThreadMutex_lock(&receiver->recvThread.mutex);
+    bool running = receiver->recvThread.running;
+    celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    bool allConnected = receiver->requestedConnections.allConnected;
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    bool allInitialized = receiver->subscribers.allInitialized;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
+    while (running) {
+        if (!allConnected) {
+            psa_websocket_connectToAllRequestedConnections(receiver);
+        }
+        if (!allInitialized) {
+            psa_websocket_initializeAllSubscribers(receiver);
+        }
+
+        while(celix_arrayList_size(receiver->recvBuffer.list) > 0) {
+            celixThreadMutex_lock(&receiver->recvBuffer.mutex);
+            pubsub_websocket_msg_t *msg = (pubsub_websocket_msg_t *) 
celix_arrayList_get(receiver->recvBuffer.list, 0);
+            struct timespec *rcvTime = (struct timespec *) 
celix_arrayList_get(receiver->recvBuffer.rcvTimes, 0);
+            celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
+
+            processMsg(receiver, &msg->header, msg->payload, msg->payloadSize, 
rcvTime);
+            free(msg);
+            free(rcvTime);
+
+            celixThreadMutex_lock(&receiver->recvBuffer.mutex);
+            celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
+            celix_arrayList_removeAt(receiver->recvBuffer.rcvTimes, 0);
+            celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
+        }
+
+        celixThreadMutex_lock(&receiver->recvThread.mutex);
+        running = receiver->recvThread.running;
+        celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        allConnected = receiver->requestedConnections.allConnected;
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        allInitialized = receiver->subscribers.allInitialized;
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    } // while
+
+    return NULL;
+}
+
+static int psa_websocketTopicReceiver_data(struct mg_connection *connection 
__attribute__((unused)),
+                                            int op_code 
__attribute__((unused)),
+                                            char *data,
+                                            size_t length,
+                                            void *handle) {
+    //Received a websocket message, append this message to the buffer of the 
receiver.
+    if (handle != NULL) {
+        psa_websocket_requested_connection_entry_t *entry = 
(psa_websocket_requested_connection_entry_t *) handle;
+
+        pubsub_websocket_msg_t *rcvdMsg = malloc(length);
+        memcpy(rcvdMsg, data, length);
+
+        //Check if payload is completely received
+        unsigned long rcvdPayloadSize = length - sizeof(rcvdMsg->header) - 
sizeof(rcvdMsg->payloadSize);
+        if(rcvdMsg->payloadSize == rcvdPayloadSize) {
+            celixThreadMutex_lock(&entry->recvBuffer->mutex);
+            celix_arrayList_add(entry->recvBuffer->list, rcvdMsg);
+            struct timespec *receiveTime = malloc(sizeof(*receiveTime));
+            clock_gettime(CLOCK_REALTIME, receiveTime);
+            celix_arrayList_add(entry->recvBuffer->rcvTimes, receiveTime);
+            celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+        } else {
+            free(rcvdMsg);
+        }
+    }
+
+    return 1; //keep open (non-zero), 0 to close the socket
+}
+
+static void psa_websocketTopicReceiver_close(const struct mg_connection 
*connection __attribute__((unused)), void *handle) {
+    //Reset connection for this receiver entry
+    if (handle != NULL) {
+        psa_websocket_requested_connection_entry_t *entry = 
(psa_websocket_requested_connection_entry_t *) handle;
+        entry->connected = false;
+        entry->sockConnection = NULL;
+    }
+}
+
+pubsub_admin_receiver_metrics_t* 
pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t 
*receiver) {
+    pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->scope);
+    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->topic);
+
+    size_t msgTypesCount = 0;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hashMapIterator_nextValue(&iter2);
+            msgTypesCount += 1;
+        }
+    }
+
+    result->nrOfMsgTypes = (unsigned long)msgTypesCount;
+    result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
+    int i = 0;
+    iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+            result->msgTypes[i].origins = 
calloc((size_t)hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
+            int k = 0;
+            hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
+            while (hashMapIterator_hasNext(&iter3)) {
+                psa_websocket_subscriber_metrics_entry_t *metrics = 
hashMapIterator_nextValue(&iter3);
+                result->msgTypes[i].typeId = metrics->msgTypeId;
+                pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, 
(void*)(uintptr_t)metrics->msgTypeId);
+                if (msgSer) {
+                    snprintf(result->msgTypes[i].typeFqn, 
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+                    uuid_copy(result->msgTypes[i].origins[k].originUUID, 
metrics->origin);
+                    result->msgTypes[i].origins[k].nrOfMessagesReceived = 
metrics->nrOfMessagesReceived;
+                    result->msgTypes[i].origins[k].nrOfSerializationErrors = 
metrics->nrOfSerializationErrors;
+                    result->msgTypes[i].origins[k].averageDelayInSeconds = 
metrics->averageDelayInSeconds;
+                    result->msgTypes[i].origins[k].maxDelayInSeconds = 
metrics->maxDelayInSeconds;
+                    result->msgTypes[i].origins[k].minDelayInSeconds = 
metrics->minDelayInSeconds;
+                    
result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = 
metrics->averageTimeBetweenMessagesInSeconds;
+                    
result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = 
metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].lastMessageReceived = 
metrics->lastMessageReceived;
+                    result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = 
metrics->nrOfMissingSeqNumbers;
+
+                    k += 1;
+                } else {
+                    L_WARN("[PSA_WEBSOCKET]: Error cannot find key 0x%X in msg 
map during metrics collection!\n", metrics->msgTypeId);
+                }
+            }
+            i +=1 ;
+        }
+    }
+
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+    return result;
+}
+
+
+static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
 *receiver) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (!receiver->requestedConnections.allConnected) {
+        bool allConnected = true;
+        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (!entry->connected) {
+                char errBuf[100] = {0};
+                entry->sockConnection = 
mg_connect_websocket_client(entry->socketAddress,
+                                                                    (int) 
entry->socketPort,
+                                                                    0, // No 
ssl
+                                                                    errBuf,
+                                                                    (size_t) 
sizeof(errBuf),
+                                                                    entry->uri,
+                                                                    NULL,
+                                                                    
psa_websocketTopicReceiver_data,
+                                                                    
psa_websocketTopicReceiver_close,
+                                                                    entry);
+                if(entry->sockConnection != NULL) {
+                    entry->connected = true;
+                    entry->connectRetryCount = 0;
+                } else {
+                    entry->connectRetryCount += 1;
+                    allConnected = false;
+                    if((entry->connectRetryCount % 10) == 0) {
+                        L_WARN("[PSA_WEBSOCKET] Error connecting to websocket 
%s:%li/%s. Error: %s",
+                               entry->socketAddress,
+                               entry->socketPort,
+                               entry->uri, errBuf);
+                    }
+                }
+            }
+        }
+        receiver->requestedConnections.allConnected = allConnected;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void 
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t 
*receiver) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    if (!receiver->subscribers.allInitialized) {
+        bool allInitialized = true;
+        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (!entry->initialized) {
+                int rc = 0;
+                if (entry->svc != NULL && entry->svc->init != NULL) {
+                    rc = entry->svc->init(entry->svc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
+                }
+            }
+        }
+        receiver->subscribers.allInitialized = allInitialized;
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
new file mode 100644
index 0000000..1a00cfa
--- /dev/null
+++ 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
+#define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
+
+#include <pubsub_admin_metrics.h>
+#include "celix_bundle_context.h"
+
+typedef struct pubsub_websocket_topic_receiver 
pubsub_websocket_topic_receiver_t;
+
+pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        const celix_properties_t *topicProperties,
+        long serializerSvcId,
+        pubsub_serializer_service_t *serializer);
+void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t 
*receiver);
+
+const char* 
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t 
*receiver);
+const char* 
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t 
*receiver);
+
+long 
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t 
*receiver);
+void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t 
*unconnectedUrls);
+
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort, const char *uri);
+void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t 
*receiver, const char *uri);
+
+
+pubsub_admin_receiver_metrics_t* 
pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t 
*receiver);
+
+
+#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
new file mode 100644
index 0000000..42f66c7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <zconf.h>
+#include <log_helper.h>
+#include "pubsub_websocket_topic_sender.h"
+#include "pubsub_psa_websocket_constants.h"
+#include "pubsub_websocket_common.h"
+#include <uuid/uuid.h>
+#include <constants.h>
+#include "http_admin/api.h"
+#include "civetweb.h"
+
+#define FIRST_SEND_DELAY_IN_SECONDS             2
+
+#define L_DEBUG(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+    logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+struct pubsub_websocket_topic_sender {
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    uuid_t fwUUID;
+    bool metricsEnabled;
+
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    char *uri;
+
+    celix_websocket_service_t websockSvc;
+    long websockSvcId;
+    struct mg_connection *sockConnection;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = 
psa_websocket_bounded_service_entry_t
+    } boundedServices;
+};
+
+typedef struct psa_websocket_send_msg_entry {
+    pubsub_websocket_msg_header_t header; //partially filled header (only 
seqnr and time needs to be updated per send)
+    pubsub_msg_serializer_t *msgSer;
+    celix_thread_mutex_t sendLock; //protects send & Seqnr
+    unsigned int seqNr;
+    struct {
+        celix_thread_mutex_t mutex; //protects entries in struct
+        unsigned long nrOfMessagesSend;
+        unsigned long nrOfMessagesSendFailed;
+        unsigned long nrOfSerializationErrors;
+        struct timespec lastMessageSend;
+        double averageTimeBetweenMessagesInSeconds;
+        double averageSerializationTimeInSeconds;
+    } metrics;
+} psa_websocket_send_msg_entry_t;
+
+typedef struct psa_websocket_bounded_service_entry {
+    pubsub_websocket_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+    hash_map_t *msgEntries; //key = msg type id, value = 
psa_websocket_send_msg_entry_t
+    int getCount;
+} psa_websocket_bounded_service_entry_t;
+
+
+static void* psa_websocket_getPublisherService(void *handle, const 
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_websocket_ungetPublisherService(void *handle, const 
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t 
*sender);
+
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int 
msgTypeId, const void *msg);
+
+static void psa_websocketTopicSender_ready(struct mg_connection *connection, 
void *handle);
+static void psa_websocketTopicSender_close(const struct mg_connection 
*connection, void *handle);
+
+pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        pubsub_serializer_service_t *ser) {
+    pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+    sender->serializerSvcId = serializerSvcId;
+    sender->serializer = ser;
+    psa_websocket_setScopeAndTopicFilter(scope, topic, 
sender->scopeAndTopicFilter);
+    const char* uuid = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    if (uuid != NULL) {
+        uuid_parse(uuid, sender->fwUUID);
+    }
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
+
+    sender->uri = psa_websocket_createURI(scope, topic);
+
+    if (sender->uri != NULL) {
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, WEBSOCKET_ADMIN_URI, sender->uri);
+
+        sender->websockSvc.handle = sender;
+        sender->websockSvc.ready = psa_websocketTopicSender_ready;
+        sender->websockSvc.close = psa_websocketTopicSender_close;
+        sender->websockSvcId = celix_bundleContext_registerService(ctx, 
&sender->websockSvc,
+                                                                   
WEBSOCKET_ADMIN_SERVICE_NAME, props);
+    } else {
+        sender->websockSvcId = -1;
+    }
+
+    if (sender->websockSvcId > 0) {
+        sender->scope = strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+    }
+
+    //register publisher services using a service factory
+    if (sender->websockSvcId > 0) {
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = 
psa_websocket_getPublisherService;
+        sender->publisher.factory.ungetService = 
psa_websocket_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+        celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+        opts.factory = &sender->publisher.factory;
+        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+        opts.properties = props;
+
+        sender->publisher.svcId = 
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+    }
+
+    if (sender->websockSvcId < 0) {
+        free(sender);
+        sender = NULL;
+    }
+
+    return sender;
+}
+
+void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t 
*sender) {
+    if (sender != NULL) {
+        celix_bundleContext_unregisterService(sender->ctx, 
sender->publisher.svcId);
+
+        celixThreadMutex_lock(&sender->boundedServices.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
+
+                hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
+                    celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+                    free(msgEntry);
+
+                }
+                hashMap_destroy(entry->msgEntries, false, false);
+
+                free(entry);
+            }
+        }
+        hashMap_destroy(sender->boundedServices.map, false, false);
+        celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+
+        celix_bundleContext_unregisterService(sender->ctx, 
sender->websockSvcId);
+
+        free(sender->scope);
+        free(sender->topic);
+        free(sender->uri);
+        free(sender);
+    }
+}
+
+long 
pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t 
*sender) {
+    return sender->serializerSvcId;
+}
+
+const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t 
*sender) {
+    return sender->scope;
+}
+
+const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t 
*sender) {
+    return sender->topic;
+}
+
+const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t 
*sender) {
+    return sender->uri;
+}
+
+
+static void* psa_websocket_getPublisherService(void *handle, const 
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties 
__attribute__((unused))) {
+    pubsub_websocket_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_websocket_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount += 1;
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+
+        int rc = 
sender->serializer->createSerializerMap(sender->serializer->handle, 
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
+        if (rc == 0) {
+            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *hashMapEntry = 
hashMapIterator_nextEntry(&iter);
+                void *key = hashMapEntry_getKey(hashMapEntry);
+                psa_websocket_send_msg_entry_t *sendEntry = calloc(1, 
sizeof(*sendEntry));
+                sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
+                sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+                int major;
+                int minor;
+                version_getMajor(sendEntry->msgSer->msgVersion, &major);
+                version_getMinor(sendEntry->msgSer->msgVersion, &minor);
+                sendEntry->header.major = (uint8_t)major;
+                sendEntry->header.minor = (uint8_t)minor;
+                uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+                celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
+                hashMap_put(entry->msgEntries, key, sendEntry);
+            }
+            entry->service.handle = entry;
+            entry->service.localMsgTypeIdForMsgType = 
psa_websocket_localMsgTypeIdForMsgType;
+            entry->service.send = psa_websocket_topicPublicationSend;
+            hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
+        } else {
+            L_ERROR("Error creating serializer map for websocket TopicSender 
%s/%s", sender->scope, sender->topic);
+        }
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+    return &entry->service;
+}
+
+static void psa_websocket_ungetPublisherService(void *handle, const 
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties 
__attribute__((unused))) {
+    pubsub_websocket_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_websocket_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void*)bndId);
+    if (entry != NULL) {
+        entry->getCount -= 1;
+    }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void*)bndId);
+        int rc = 
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
+        }
+
+        hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter);
+            celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+            free(msgEntry);
+        }
+        hashMap_destroy(entry->msgEntries, false, false);
+
+        free(entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+}
+
+pubsub_admin_sender_metrics_t* 
pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender) {
+    pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->scope);
+    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->topic);
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    size_t count = 0;
+    hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hashMapIterator_nextValue(&iter2);
+            count += 1;
+        }
+    }
+
+    result->msgMetrics = calloc(count, sizeof(*result));
+
+    iter = hashMapIterator_construct(sender->boundedServices.map);
+    int i = 0;
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+            psa_websocket_send_msg_entry_t *mEntry = 
hashMapIterator_nextValue(&iter2);
+            celixThreadMutex_lock(&mEntry->metrics.mutex);
+            result->msgMetrics[i].nrOfMessagesSend = 
mEntry->metrics.nrOfMessagesSend;
+            result->msgMetrics[i].nrOfMessagesSendFailed = 
mEntry->metrics.nrOfMessagesSendFailed;
+            result->msgMetrics[i].nrOfSerializationErrors = 
mEntry->metrics.nrOfSerializationErrors;
+            result->msgMetrics[i].averageSerializationTimeInSeconds = 
mEntry->metrics.averageSerializationTimeInSeconds;
+            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = 
mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+            result->msgMetrics[i].lastMessageSend = 
mEntry->metrics.lastMessageSend;
+            result->msgMetrics[i].bndId = entry->bndId;
+            result->msgMetrics[i].typeId = mEntry->header.type;
+            snprintf(result->msgMetrics[i].typeFqn, 
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
+            i += 1;
+            celixThreadMutex_unlock(&mEntry->metrics.mutex);
+        }
+    }
+
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    result->nrOfmsgMetrics = (int)count;
+    return result;
+}
+
+static int psa_websocket_topicPublicationSend(void* handle, unsigned int 
msgTypeId, const void *inMsg) {
+    int status = CELIX_SERVICE_EXCEPTION;
+    psa_websocket_bounded_service_entry_t *bound = handle;
+    pubsub_websocket_topic_sender_t *sender = bound->parent;
+    bool monitor = sender->metricsEnabled;
+    psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, 
(void *) (uintptr_t) (msgTypeId));
+
+    //metrics updates
+    struct timespec sendTime;
+    struct timespec serializationStart;
+    struct timespec serializationEnd;
+    //int unknownMessageCountUpdate = 0;
+    int sendErrorUpdate = 0;
+    int serializationErrorUpdate = 0;
+    int sendCountUpdate = 0;
+
+    if (sender->sockConnection != NULL && entry != NULL) {
+        delay_first_send_for_late_joiners(sender);
+
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &serializationStart);
+        }
+
+        void *serializedOutput = NULL;
+        size_t serializedOutputLen = 0;
+        status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
+
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &serializationEnd);
+        }
+
+        if (status == CELIX_SUCCESS /*ser ok*/) {
+            unsigned char *hdrEncoded = 
calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
+
+            celixThreadMutex_lock(&entry->sendLock);
+
+            pubsub_websocket_msg_t *msg = malloc(sizeof(*msg) + 
sizeof(char[serializedOutputLen]));
+            pubsub_websocket_msg_header_t *msgHdr = &entry->header;
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &sendTime);
+                msgHdr->sendtimeSeconds = (uint64_t) sendTime.tv_sec;
+                msgHdr->sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
+                msgHdr->seqNr++;
+            }
+            memcpy(&msg->header, msgHdr, 
sizeof(pubsub_websocket_msg_header_t));
+
+            msg->payloadSize = (unsigned int) serializedOutputLen;
+            size_t hdr_size = sizeof(msg->header);
+            size_t ps_size = sizeof(msg->payloadSize);
+            size_t bytes_to_write = hdr_size + ps_size + 
serializedOutputLen;//sizeof(*msg);
+            memcpy(msg->payload, serializedOutput, serializedOutputLen);
+            int bytes_written = 
mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, 
(char *) msg, bytes_to_write);
+
+            celixThreadMutex_unlock(&entry->sendLock);
+            if (bytes_written == (int) bytes_to_write) {
+                sendCountUpdate = 1;
+            } else {
+                sendErrorUpdate = 1;
+                L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket.");
+            }
+
+            free(msg);
+            free(hdrEncoded);
+            free(serializedOutput);
+        } else {
+            serializationErrorUpdate = 1;
+            L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for 
scope/topic %s/%s",
+                   entry->msgSer->msgName, sender->scope, sender->topic);
+        }
+    } else if (entry == NULL){
+        //unknownMessageCountUpdate = 1;
+        L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i 
for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+    }
+
+
+    if (monitor && status == CELIX_SUCCESS) {
+        celixThreadMutex_lock(&entry->metrics.mutex);
+
+        long n = entry->metrics.nrOfMessagesSend + 
entry->metrics.nrOfMessagesSendFailed;
+        double diff = celix_difftime(&serializationStart, &serializationEnd);
+        double average = (entry->metrics.averageSerializationTimeInSeconds * n 
+ diff) / (n+1);
+        entry->metrics.averageSerializationTimeInSeconds = average;
+
+        if (entry->metrics.nrOfMessagesSend > 2) {
+            diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
+            n = entry->metrics.nrOfMessagesSend;
+            average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n 
+ diff) / (n+1);
+            entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+        }
+
+        entry->metrics.lastMessageSend = sendTime;
+        entry->metrics.nrOfMessagesSend += sendCountUpdate;
+        entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
+        entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+
+        celixThreadMutex_unlock(&entry->metrics.mutex);
+    }
+
+    return status;
+}
+
+static void psa_websocketTopicSender_ready(struct mg_connection *connection, 
void *handle) {
+    //Connection succeeded so save connection to use for sending the messages
+    pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t 
*) handle;
+    sender->sockConnection = connection;
+}
+
+static void psa_websocketTopicSender_close(const struct mg_connection 
*connection __attribute__((unused)), void *handle) {
+    //Connection closed so reset connection
+    pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t 
*) handle;
+    sender->sockConnection = NULL;
+}
+
+static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t 
*sender) {
+
+    static bool firstSend = true;
+
+    if (firstSend) {
+        L_INFO("PSA_WEBSOCKET_TP: Delaying first send for late joiners...\n");
+        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        firstSend = false;
+    }
+}
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
new file mode 100644
index 0000000..35229a8
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
+#define CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
+
+#include "celix_bundle_context.h"
+#include "pubsub_admin_metrics.h"
+
+typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
+
+pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
+        celix_bundle_context_t *ctx,
+        log_helper_t *logHelper,
+        const char *scope,
+        const char *topic,
+        long serializerSvcId,
+        pubsub_serializer_service_t *ser);
+void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t 
*sender);
+
+const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t 
*sender);
+const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t 
*sender);
+const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t 
*sender);
+
+long 
pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t 
*sender);
+
+/**
+ * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every 
msg_type/bundle send with the topic sender.
+ */
+pubsub_admin_sender_metrics_t* 
pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender);
+
+#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
diff --git a/bundles/pubsub/test/CMakeLists.txt 
b/bundles/pubsub/test/CMakeLists.txt
index d37eb23..957745b 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -172,6 +172,27 @@ add_test(NAME pubsub_tcp_endpoint_tests COMMAND 
pubsub_tcp_endpoint_tests WORKIN
 SETUP_TARGET_FOR_COVERAGE(pubsub_tcp_endpoint_tests_cov 
pubsub_tcp_endpoint_tests 
${CMAKE_BINARY_DIR}/coverage/pubsub_tcp_endpoint_tests/pubsub_tcp_endpoint_tests
 ..)
 endif()
 
+add_celix_container(pubsub_websocket_tests
+        USE_CONFIG
+        LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+        DIR ${CMAKE_CURRENT_BINARY_DIR}
+        PROPERTIES
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            USE_WEBSOCKETS=true
+            LISTENING_PORTS=8080
+        BUNDLES
+            Celix::pubsub_serializer_json
+            Celix::http_admin
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_websocket
+            pubsub_sut
+            pubsub_tst
+)
+target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api 
${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi)
+target_include_directories(pubsub_websocket_tests PRIVATE 
${CPPUTEST_INCLUDE_DIR})
+add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests 
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>)
+SETUP_TARGET_FOR_COVERAGE(pubsub_websocket_tests_cov pubsub_websocket_tests 
${CMAKE_BINARY_DIR}/coverage/pubsub_websocket_tests/pubsub_websocket_tests ..)
+
 if (BUILD_PUBSUB_PSA_ZMQ)
     add_celix_container(pubsub_zmq_tests
             USE_CONFIG #ensures that a config.properties will be created with 
the launch bundles.
diff --git a/bundles/pubsub/test/meta_data/ping.properties 
b/bundles/pubsub/test/meta_data/ping.properties
index b73435d..2e13c17 100644
--- a/bundles/pubsub/test/meta_data/ping.properties
+++ b/bundles/pubsub/test/meta_data/ping.properties
@@ -20,6 +20,7 @@ tcp.static.bind.url=tcp://127.0.0.1:9000
 tcp.static.connect.urls=tcp://127.0.0.1:9000
 udpmc.static.bind.port=50678
 udpmc.static.connect.socket_addresses=224.100.0.1:50678
+websocket.static.connect.socket_addresses=127.0.0.1:8080
 
 #note only effective if run as root
 thread.realtime.shed=SCHED_FIFO

Reply via email to