This is an automated email from the ASF dual-hosted git repository.
oipo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/master by this push:
new 3df528d Feature/pubsub websocket serialization (#338)
3df528d is described below
commit 3df528d41c6a49a377514e1360f1bcb7b39cebd3
Author: Michael de Lang <[email protected]>
AuthorDate: Fri Apr 23 14:38:20 2021 +0200
Feature/pubsub websocket serialization (#338)
* Fix serializing/deserializing json for pubsub
* Add service ranking to wire protocols and make endpoint matching print
errors if match is mandatory but cannot be made
* Fix memory leak in RSA
* Move current pubsub websocket implementation to v1
* Reset v1 to original
* Add v2 websocket
---
bundles/pubsub/CMakeLists.txt | 3 +-
bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt | 2 +-
.../v2/src/pubsub_tcp_topic_receiver.c | 1 -
.../pubsub_admin_websocket/{ => v1}/CMakeLists.txt | 0
.../{ => v1}/src/psa_activator.c | 0
.../{ => v1}/src/pubsub_psa_websocket_constants.h | 0
.../{ => v1}/src/pubsub_websocket_admin.c | 0
.../{ => v1}/src/pubsub_websocket_admin.h | 0
.../{ => v1}/src/pubsub_websocket_common.c | 0
.../{ => v1}/src/pubsub_websocket_common.h | 0
.../{ => v1}/src/pubsub_websocket_topic_receiver.c | 0
.../{ => v1}/src/pubsub_websocket_topic_receiver.h | 0
.../{ => v1}/src/pubsub_websocket_topic_sender.c | 0
.../{ => v1}/src/pubsub_websocket_topic_sender.h | 0
.../pubsub_admin_websocket/{ => v2}/CMakeLists.txt | 20 +-
.../{ => v2}/src/psa_activator.c | 6 +-
.../{ => v2}/src/pubsub_psa_websocket_constants.h | 0
.../{ => v2}/src/pubsub_websocket_admin.c | 267 +++++++++++++--------
.../{ => v2}/src/pubsub_websocket_admin.h | 12 +
.../{ => v2}/src/pubsub_websocket_common.c | 0
.../{ => v2}/src/pubsub_websocket_common.h | 0
.../{ => v2}/src/pubsub_websocket_topic_receiver.c | 138 +++++------
.../{ => v2}/src/pubsub_websocket_topic_receiver.h | 6 +-
.../{ => v2}/src/pubsub_websocket_topic_sender.c | 121 +++++-----
.../{ => v2}/src/pubsub_websocket_topic_sender.h | 7 +-
bundles/pubsub/test/CMakeLists.txt | 74 +++++-
26 files changed, 388 insertions(+), 269 deletions(-)
diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt
index 1a780a2..3a66b25 100644
--- a/bundles/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/CMakeLists.txt
@@ -38,7 +38,8 @@ if (PUBSUB)
option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON)
if (BUILD_PUBSUB_PSA_WS)
- add_subdirectory(pubsub_admin_websocket)
+ add_subdirectory(pubsub_admin_websocket/v1)
+ add_subdirectory(pubsub_admin_websocket/v2)
endif (BUILD_PUBSUB_PSA_WS)
add_subdirectory(pubsub_api)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
b/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
index f7455da..117e85a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/CMakeLists.txt
@@ -32,7 +32,7 @@ add_celix_bundle(celix_pubsub_admin_tcp_v2
set_target_properties(celix_pubsub_admin_tcp_v2 PROPERTIES INSTALL_RPATH
"$ORIGIN")
target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::pubsub_spi
Celix::pubsub_utils)
-target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::framework
Celix::dfi Celix::log_helper)
+target_link_libraries(celix_pubsub_admin_tcp_v2 PRIVATE Celix::framework
Celix::log_helper)
target_include_directories(celix_pubsub_admin_tcp_v2 PRIVATE src)
# cmake find package UUID set the wrong include dir for OSX
if (NOT APPLE)
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 2990029..853e49d 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -23,7 +23,6 @@
#include <unistd.h>
#include <pubsub/subscriber.h>
#include <memory.h>
-#include <pubsub_constants.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
#include "pubsub_tcp_handler.h"
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
copy to bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
b/bundles/pubsub/pubsub_admin_websocket/v1/src/psa_activator.c
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/psa_activator.c
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_psa_websocket_constants.h
similarity index 100%
copy from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
copy to
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_psa_websocket_constants.h
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.c
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.c
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.h
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_admin.h
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.c
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.c
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.h
similarity index 100%
copy from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
copy to bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_common.h
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
similarity index 100%
copy from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
copy to
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.h
similarity index 100%
copy from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
copy to
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.h
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.c
similarity index 100%
copy from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
copy to
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.c
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.h
similarity index 100%
copy from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
copy to
bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_sender.h
diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
b/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
similarity index 60%
rename from bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
rename to bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
index f6397e0..6557ddb 100644
--- a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/CMakeLists.txt
@@ -18,8 +18,8 @@
find_package(Jansson REQUIRED)
find_package(UUID REQUIRED)
-add_celix_bundle(celix_pubsub_admin_websocket
- BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket"
+add_celix_bundle(celix_pubsub_admin_websocket_v2
+ BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket_v2"
VERSION "1.0.0"
GROUP "Celix/PubSub"
SOURCES
@@ -30,16 +30,16 @@ add_celix_bundle(celix_pubsub_admin_websocket
src/pubsub_websocket_common.c
)
-set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH
"$ORIGIN")
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE
- Celix::framework Celix::dfi Celix::log_helper Celix::utils
+set_target_properties(celix_pubsub_admin_websocket_v2 PROPERTIES INSTALL_RPATH
"$ORIGIN")
+target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE
+ Celix::framework Celix::log_helper Celix::utils
Celix::http_admin_api
)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::pubsub_spi
Celix::pubsub_utils )
-target_include_directories(celix_pubsub_admin_websocket PRIVATE
+target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE
Celix::pubsub_spi Celix::pubsub_utils )
+target_include_directories(celix_pubsub_admin_websocket_v2 PRIVATE
src
)
-install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT
pubsub)
-target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api)
-add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket)
+install_celix_bundle(celix_pubsub_admin_websocket_v2 EXPORT celix COMPONENT
pubsub)
+target_link_libraries(celix_pubsub_admin_websocket_v2 PRIVATE Celix::shell_api)
+add_library(Celix::pubsub_admin_websocket_v2 ALIAS
celix_pubsub_admin_websocket_v2)
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
similarity index 96%
rename from bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
index 2f4b8cc..159d8ed 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
@@ -18,6 +18,7 @@
*/
#include <stdlib.h>
+#include <pubsub_message_serialization_service.h>
#include "celix_api.h"
#include "pubsub_serializer.h"
@@ -47,7 +48,7 @@ int psa_websocket_start(psa_websocket_activator_t *act,
celix_bundle_context_t *
act->cmdSvcId = -1L;
act->serializersTrackerId = -1L;
- act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_websocket");
+ act->logHelper = celix_logHelper_create(ctx,
"celix_psa_admin_websocket_v2");
act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS :
CELIX_BUNDLE_EXCEPTION;
@@ -55,8 +56,7 @@ int psa_websocket_start(psa_websocket_activator_t *act,
celix_bundle_context_t *
//track serializers (only json)
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts =
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
- opts.filter.filter = "(pubsub.serializer=json)";
+ opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter.ignoreServiceLanguage = true;
opts.callbackHandle = act->admin;
opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_psa_websocket_constants.h
similarity index 100%
rename from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h
rename to
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_psa_websocket_constants.h
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
similarity index 68%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
index 9c4633a..8950fda 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
@@ -21,6 +21,8 @@
#include <pubsub_endpoint.h>
#include <pubsub_serializer.h>
#include <ip_utils.h>
+#include <pubsub_message_serialization_service.h>
+#include <pubsub_matching.h>
#include "pubsub_utils.h"
#include "pubsub_websocket_admin.h"
@@ -50,7 +52,7 @@ struct pubsub_websocket_admin {
bool verbose;
struct {
- celix_thread_mutex_t mutex;
+ celix_thread_rwlock_t mutex;
hash_map_t *map; //key = svcId, value =
psa_websocket_serializer_entry_t*
} serializers;
@@ -71,16 +73,15 @@ struct pubsub_websocket_admin {
};
-typedef struct psa_websocket_serializer_entry {
- const char *serType;
- long svcId;
- pubsub_serializer_service_t *svc;
-} psa_websocket_serializer_entry_t;
-
static celix_status_t
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa,
pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t
*endpoint);
static celix_status_t
pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t*
psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t
*endpoint);
+static void pubsub_websocketAdmin_getSerType(void *handle, void *svc
__attribute__((unused)), const celix_properties_t* props) {
+ const char** out = handle;
+ *out = celix_properties_get(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
+}
+
pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t
*ctx, celix_log_helper_t *logHelper) {
pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
@@ -92,8 +93,8 @@ pubsub_websocket_admin_t*
pubsub_websocketAdmin_create(celix_bundle_context_t *c
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx,
PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx,
PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadMutex_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL);
+ celixThreadRwlock_create(&psa->serializers.mutex, NULL);
+ psa->serializers.map = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
@@ -112,8 +113,7 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t
*psa) {
return;
}
- //note assuming al psa register services and service tracker are removed.
-
+ //note assuming all psa register services and service tracker are removed.
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter =
hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -138,13 +138,13 @@ void
pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadRwlock_writeLock(&psa->serializers.mutex);
iter = hashMapIterator_construct(psa->serializers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_serializer_entry_t *entry =
hashMapIterator_nextValue(&iter);
free(entry);
}
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -155,7 +155,7 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t
*psa) {
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadMutex_destroy(&psa->serializers.mutex);
+ celixThreadRwlock_destroy(&psa->serializers.mutex);
hashMap_destroy(psa->serializers.map, false, false);
free(psa);
@@ -164,29 +164,48 @@ void
pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const
celix_properties_t *props) {
pubsub_websocket_admin_t *psa = handle;
- const char *serType = celix_properties_get(props,
PUBSUB_SERIALIZER_TYPE_KEY, NULL);
- long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID,
-1L);
+ const char *serType = celix_properties_get(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
+ long msgId = celix_properties_getAsLong(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
+ const char *msgFqn = celix_properties_get(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
+ const char *msgVersion = celix_properties_get(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
+
+ if (serType == NULL || msgId == -1L || msgFqn == NULL) {
+ L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of
the following properties: %s or %s or %s",
+
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
- if (serType == NULL) {
- L_INFO("[PSA_WEBSOCKET] Ignoring serializer service without %s
property", PUBSUB_SERIALIZER_TYPE_KEY);
+ L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn
%s", serType, msgId, msgFqn);
return;
}
-
- celixThreadMutex_lock(&psa->serializers.mutex);
- psa_websocket_serializer_entry_t *entry =
hashMap_get(psa->serializers.map, (void*)svcId);
+ L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s",
serType, msgId, msgFqn);
+
+ celixThreadRwlock_writeLock(&psa->serializers.mutex);
+ hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
+ if(typeEntries == NULL) {
+ typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
+ hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType),
typeEntries);
+ L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s",
psa->serializers.map, serType);
+ }
+ psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries,
(void*)msgId);
if (entry == NULL) {
- entry = calloc(1, sizeof(*entry));
- entry->serType = serType;
- entry->svcId = svcId;
+ entry = calloc(1, sizeof(psa_websocket_serializer_entry_t));
entry->svc = svc;
- hashMap_put(psa->serializers.map, (void*)svcId, entry);
+ entry->fqn = celix_utils_strdup(msgFqn);
+ entry->version = celix_utils_strdup(msgVersion);
+ hashMap_put(typeEntries, (void*)msgId, entry);
+ L_INFO("[PSA_WEBSOCKET_V2] entry added");
}
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
}
void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const
celix_properties_t *props) {
pubsub_websocket_admin_t *psa = handle;
- long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID,
-1L);
+ const char *serType = celix_properties_get(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
+ long msgId = celix_properties_getAsLong(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
+
+ if(serType == NULL || msgId == -1) {
+ L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i",
serType, msgId);
+ return;
+ }
//remove serializer
// 1) First find entry and
@@ -194,48 +213,59 @@ void pubsub_websocketAdmin_removeSerializerSvc(void
*handle, void *svc, const ce
// 3) loop and destroy all topic receivers using the serializer
// Note that it is the responsibility of the topology manager to create
new topic senders/receivers
- celixThreadMutex_lock(&psa->serializers.mutex);
- psa_websocket_serializer_entry_t *entry =
hashMap_remove(psa->serializers.map, (void*)svcId);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadRwlock_writeLock(&psa->serializers.mutex);
+ hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
+ if(typeEntries != NULL) {
+ psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries,
(void*)msgId);
+ free((void*)entry->fqn);
+ free((void*)entry->version);
+ free(entry);
- if (entry != NULL) {
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter =
hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_websocket_topic_sender_t *sender =
hashMapEntry_getValue(senderEntry);
- if (sender != NULL && entry->svcId ==
pubsub_websocketTopicSender_serializerSvcId(sender)) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_websocketTopicSender_destroy(sender);
- free(key);
+ // check if there are no remaining serializers for the given type. If
not, remove all senders and receivers for this type.
+ if(hashMap_size(typeEntries) == 0) {
+ hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map,
serType), true, false);
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
+
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ hash_map_iterator_t iter =
hashMapIterator_construct(psa->topicSenders.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry =
hashMapIterator_nextEntry(&iter);
+ pubsub_websocket_topic_sender_t *sender =
hashMapEntry_getValue(senderEntry);
+ if (sender != NULL && strncmp(serType,
pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_websocketTopicSender_destroy(sender);
+ free(key);
+ }
}
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_websocket_topic_receiver_t *receiver =
hashMapEntry_getValue(senderEntry);
- if (receiver != NULL && entry->svcId ==
pubsub_websocketTopicReceiver_serializerSvcId(receiver)) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_websocketTopicReceiver_destroy(receiver);
- free(key);
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ iter = hashMapIterator_construct(psa->topicReceivers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *senderEntry =
hashMapIterator_nextEntry(&iter);
+ pubsub_websocket_topic_receiver_t *receiver =
hashMapEntry_getValue(senderEntry);
+ if (receiver != NULL && strncmp(serType,
pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
+ char *key = hashMapEntry_getKey(senderEntry);
+ hashMapIterator_remove(&iter);
+ pubsub_websocketTopicReceiver_destroy(receiver);
+ free(key);
+ }
}
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ } else {
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
}
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
- free(entry);
+ } else {
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
}
}
celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long
svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t
**topicProperties, double *outScore, long *outSerializerSvcId, long
*outProtocolSvcId) {
pubsub_websocket_admin_t *psa = handle;
- L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher");
+ L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsubEndpoint_matchPublisher(psa->ctx, svcRequesterBndId,
svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+ double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId,
svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE,
psa->qosSampleScore,
psa->qosControlScore, psa->defaultScore,
false, topicProperties,
outSerializerSvcId, outProtocolSvcId);
*outScore = score;
@@ -245,9 +275,9 @@ celix_status_t pubsub_websocketAdmin_matchPublisher(void
*handle, long svcReques
celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long
svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t
**topicProperties, double *outScore, long *outSerializerSvcId, long
*outProtocolSvcId) {
pubsub_websocket_admin_t *psa = handle;
- L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber");
+ L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchSubscriber");
celix_status_t status = CELIX_SUCCESS;
- double score = pubsubEndpoint_matchSubscriber(psa->ctx, svcProviderBndId,
svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
+ double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId,
svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE,
psa->qosSampleScore,
psa->qosControlScore, psa->defaultScore,
false, topicProperties,
outSerializerSvcId, outProtocolSvcId);
if (outScore != NULL) {
@@ -258,9 +288,9 @@ celix_status_t pubsub_websocketAdmin_matchSubscriber(void
*handle, long svcProvi
celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle,
const celix_properties_t *endpoint, bool *outMatch) {
pubsub_websocket_admin_t *psa = handle;
- L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint");
+ L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchEndpoint");
celix_status_t status = CELIX_SUCCESS;
- bool match = pubsubEndpoint_match(psa->ctx, psa->log, endpoint,
PUBSUB_WEBSOCKET_ADMIN_TYPE, false, NULL, NULL);
+ bool match = pubsub_utils_matchEndpoint(psa->ctx, psa->log, endpoint,
PUBSUB_WEBSOCKET_ADMIN_TYPE, false, NULL, NULL);
if (outMatch != NULL) {
*outMatch = match;
}
@@ -280,17 +310,23 @@ celix_status_t
pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&psa->serializers.mutex);
+ //get serializer type
+ const char *serType = NULL;
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.callbackHandle = &serType;
+ opts.useWithProperties = pubsub_websocketAdmin_getSerType;
+ opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+ char filter[32];
+ snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID,
serializerSvcId);
+ opts.filter.filter = filter;
+ celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_websocket_topic_sender_t *sender =
hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
- psa_websocket_serializer_entry_t *serEntry =
hashMap_get(psa->serializers.map, (void*)serializerSvcId);
- if (serEntry != NULL) {
- sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log,
scope, topic, serializerSvcId, serEntry->svc);
- }
+ sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope,
topic, serType, psa);
if (sender != NULL) {
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
- const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
serType, NULL, NULL);
@@ -304,15 +340,14 @@ celix_status_t
pubsub_websocketAdmin_setupTopicSender(void *handle, const char *
}
hashMap_put(psa->topicSenders.map, key, sender);
} else {
- L_ERROR("[PSA_WEBSOCKET] Error creating a TopicSender");
+ L_ERROR("[PSA_WEBSOCKET_V2] Error creating a TopicSender");
free(key);
}
} else {
free(key);
- L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicSender for
scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
+ L_ERROR("[PSA_WEBSOCKET_V2] Cannot setup already existing TopicSender
for scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
@@ -337,7 +372,7 @@ celix_status_t
pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
free(mapKey);
pubsub_websocketTopicSender_destroy(sender);
} else {
- L_ERROR("[PSA_WEBSOCKET] Cannot teardown TopicSender with scope/topic
%s/%s. Does not exists", scope == NULL ? "(null)" : scope, topic);
+ L_ERROR("[PSA_WEBSOCKET_V2] Cannot teardown TopicSender with
scope/topic %s/%s. Does not exists", scope == NULL ? "(null)" : scope, topic);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
free(key);
@@ -350,20 +385,24 @@ celix_status_t
pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
celix_properties_t *newEndpoint = NULL;
+ //get serializer type
+ const char *serType = NULL;
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.callbackHandle = &serType;
+ opts.useWithProperties = pubsub_websocketAdmin_getSerType;
+ opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+ char filter[32];
+ snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID,
serializerSvcId);
+ opts.filter.filter = filter;
+ celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- celixThreadMutex_lock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_websocket_topic_receiver_t *receiver =
hashMap_get(psa->topicReceivers.map, key);
if (receiver == NULL) {
- psa_websocket_serializer_entry_t *serEntry =
hashMap_get(psa->serializers.map, (void*)serializerSvcId);
- if (serEntry != NULL) {
- receiver = pubsub_websocketTopicReceiver_create(psa->ctx,
psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc);
- } else {
- L_ERROR("[PSA_WEBSOCKET] Cannot find serializer for TopicSender
%s/%s", scope == NULL ? "(null)" : scope, topic);
- }
+ receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log,
scope, topic, topicProperties, serType, psa);
if (receiver != NULL) {
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
- const char *serType = serEntry->serType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
@@ -377,15 +416,14 @@ celix_status_t
pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
}
hashMap_put(psa->topicReceivers.map, key, receiver);
} else {
- L_ERROR("[PSA_WEBSOCKET] Error creating a TopicReceiver.");
+ L_ERROR("[PSA_WEBSOCKET_V2] Error creating a TopicReceiver.");
free(key);
}
} else {
free(key);
- L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicReceiver
for scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
+ L_ERROR("[PSA_WEBSOCKET_V2] Cannot setup already existing
TopicReceiver for scope/topic %s/%s!", scope == NULL ? "(null)" : scope, topic);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
if (receiver != NULL && newEndpoint != NULL) {
celixThreadMutex_lock(&psa->discoveredEndpoints.mutex);
@@ -540,19 +578,62 @@ celix_status_t
pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
return status;
}
+psa_websocket_serializer_entry_t*
pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char
*serializationType, uint32_t msgId) {
+ pubsub_websocket_admin_t *psa = handle;
+ psa_websocket_serializer_entry_t *serializer = NULL;
+
+ celixThreadRwlock_readLock(&psa->serializers.mutex);
+ hash_map_t *typeEntries = hashMap_get(psa->serializers.map,
serializationType);
+ if(typeEntries != NULL) {
+ serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
+ }
+
+ return serializer;
+}
+
+void pubsub_websocketAdmin_releaseSerializer(void *handle,
psa_websocket_serializer_entry_t* serializer) {
+ pubsub_websocket_admin_t *psa = handle;
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
+}
+
+int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const
char *serializationType, const char *fqn) {
+ pubsub_websocket_admin_t *psa = handle;
+ int64_t id = -1L;
+
+ celixThreadRwlock_readLock(&psa->serializers.mutex);
+ hash_map_t *typeEntries = hashMap_get(psa->serializers.map,
serializationType);
+ if(typeEntries != NULL) {
+ hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
+ while(hashMapIterator_hasNext(&iterator)) {
+ void *key = hashMapIterator_nextKey(&iterator);
+ psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries,
key);
+ L_WARN("[PSA_WEBSOCKET_V2]
pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn,
fqn);
+ if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
+ id = (uint32_t)(uintptr_t)key;
+ break;
+ }
+ }
+ } else {
+ L_WARN("[PSA_WEBSOCKET_V2]
pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s",
serializationType, fqn);
+ }
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
+
+ L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn
%p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
+
+ return id;
+}
+
bool pubsub_websocketAdmin_executeCommand(void *handle, const char
*commandLine __attribute__((unused)), FILE *out, FILE *errStream
__attribute__((unused))) {
pubsub_websocket_admin_t *psa = handle;
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
- celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadRwlock_writeLock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter =
hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_websocket_topic_sender_t *sender =
hashMapIterator_nextValue(&iter);
- long serSvcId = pubsub_websocketTopicSender_serializerSvcId(sender);
- psa_websocket_serializer_entry_t *serEntry =
hashMap_get(psa->serializers.map, (void*)serSvcId);
- const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *serType =
pubsub_websocketTopicSender_serializerType(sender);
const char *scope = pubsub_websocketTopicSender_scope(sender);
const char *topic = pubsub_websocketTopicSender_topic(sender);
const char *url = pubsub_websocketTopicSender_url(sender);
@@ -561,18 +642,16 @@ bool pubsub_websocketAdmin_executeCommand(void *handle,
const char *commandLine
fprintf(out, " |- url = %s\n", url);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
- celixThreadMutex_lock(&psa->serializers.mutex);
+ celixThreadRwlock_writeLock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
pubsub_websocket_topic_receiver_t *receiver =
hashMapIterator_nextValue(&iter);
- long serSvcId =
pubsub_websocketTopicReceiver_serializerSvcId(receiver);
- psa_websocket_serializer_entry_t *serEntry =
hashMap_get(psa->serializers.map, (void*)serSvcId);
- const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
+ const char *serType =
pubsub_websocketTopicReceiver_serializerType(receiver);
const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
const char *urlEndp = pubsub_websocketTopicReceiver_url(receiver);
@@ -598,7 +677,7 @@ bool pubsub_websocketAdmin_executeCommand(void *handle,
const char *commandLine
celix_arrayList_destroy(unconnected);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- celixThreadMutex_unlock(&psa->serializers.mutex);
+ celixThreadRwlock_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
return true;
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.h
similarity index 83%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.h
index bae6333..960c266 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.h
@@ -20,13 +20,21 @@
#ifndef CELIX_PUBSUB_WEBSOCKET_ADMIN_H
#define CELIX_PUBSUB_WEBSOCKET_ADMIN_H
+#include <stdint.h>
#include <pubsub_admin_metrics.h>
#include "celix_api.h"
#include "celix_log_helper.h"
#include "pubsub_psa_websocket_constants.h"
+#include <pubsub_message_serialization_service.h>
typedef struct pubsub_websocket_admin pubsub_websocket_admin_t;
+typedef struct psa_websocket_serializer_entry {
+ const char *fqn;
+ const char *version;
+ pubsub_message_serialization_service_t *svc;
+} psa_websocket_serializer_entry_t;
+
pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t
*ctx, celix_log_helper_t *logHelper);
void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa);
@@ -46,6 +54,10 @@ celix_status_t
pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const
celix_properties_t *props);
void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const
celix_properties_t *props);
+psa_websocket_serializer_entry_t*
pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char
*serializationType, uint32_t msgId);
+void pubsub_websocketAdmin_releaseSerializer(void *handle,
psa_websocket_serializer_entry_t* serializer);
+int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const
char *serializationType, const char *fqn);
+
bool pubsub_websocketAdmin_executeCommand(void *handle, const char
*commandLine, FILE *outStream, FILE *errStream);
#endif //CELIX_PUBSUB_WEBSOCKET_ADMIN_H
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.c
similarity index 100%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.c
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
similarity index 100%
rename from bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
rename to bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
similarity index 87%
rename from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
rename to
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index 7d8cd00..c93f078 100644
---
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -30,6 +30,7 @@
#include "pubsub_websocket_topic_receiver.h"
#include "pubsub_psa_websocket_constants.h"
#include "pubsub_websocket_common.h"
+#include "pubsub_websocket_admin.h"
#include <uuid/uuid.h>
#include <http_admin/api.h>
@@ -64,10 +65,10 @@ typedef struct pubsub_websocket_msg_entry {
struct pubsub_websocket_topic_receiver {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
+ void *admin;
char *scope;
char *topic;
+ char *serType;
char scopeAndTopicFilter[5];
char *uri;
@@ -109,7 +110,6 @@ typedef struct psa_websocket_requested_connection_entry {
} psa_websocket_requested_connection_entry_t;
typedef struct psa_websocket_subscriber_entry {
- hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
hash_map_t *subscriberServices; //key = servide id, value =
pubsub_subscriber_t*
bool initialized; //true if the init function is called through the
receive thread
} psa_websocket_subscriber_entry_t;
@@ -120,7 +120,6 @@ static void
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
static void* psa_websocket_recvThread(void * data);
static void
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
*receiver);
static void
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t
*receiver);
-static void *psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t
*msg_type_id_map);
static void psa_websocketTopicReceiver_ready(struct mg_connection *connection,
void *handle);
static int psa_websocketTopicReceiver_data(struct mg_connection *connection,
int op_code, char *data, size_t length, void *handle);
@@ -132,15 +131,15 @@ pubsub_websocket_topic_receiver_t*
pubsub_websocketTopicReceiver_create(celix_bu
const char
*scope,
const char
*topic,
const
celix_properties_t *topicProperties,
- long
serializerSvcId,
-
pubsub_serializer_service_t *serializer) {
+ const char
*serType,
+ void *admin) {
pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serializerSvcId = serializerSvcId;
- receiver->serializer = serializer;
+ receiver->serType = celix_utils_strdup(serType);
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
+ receiver->admin = admin;
psa_websocket_setScopeAndTopicFilter(scope, topic,
receiver->scopeAndTopicFilter);
receiver->uri = psa_websocket_createURI(scope, topic);
@@ -266,7 +265,6 @@ void
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
if (entry != NULL) {
-
receiver->serializer->destroySerializerMap(receiver->serializer->handle,
entry->msgTypes);
hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
@@ -311,6 +309,7 @@ void
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
free(receiver->uri);
free(receiver->scope);
free(receiver->topic);
+ free(receiver->serType);
}
free(receiver);
}
@@ -325,8 +324,8 @@ const char*
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t
return receiver->uri;
}
-long
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t
*receiver) {
- return receiver->serializerSvcId;
+const char
*pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t
*receiver) {
+ return receiver->serType;
}
void
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t
*unconnectedUrls) {
@@ -417,24 +416,14 @@ static void
pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_websocket_subscriber_entry_t *entry =
hashMap_get(receiver->subscribers.map, (void*)bndId);
- if (entry != NULL) {
- hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- } else {
+ if (entry == NULL) {
//new create entry
entry = calloc(1, sizeof(*entry));
entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
- hashMap_put(entry->subscriberServices, (void*)svcId, svc);
-
- int rc =
receiver->serializer->createSerializerMap(receiver->serializer->handle,
(celix_bundle_t*)bnd, &entry->msgTypes);
-
- if (rc == 0) {
- hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
- } else {
- L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for
TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
- free(entry);
- }
+ hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
}
+ hashMap_put(entry->subscriberServices, (void*)svcId, svc);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
@@ -453,74 +442,67 @@ static void
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void*)bndId);
- int rc =
receiver->serializer->destroySerializerMap(receiver->serializer->handle,
entry->msgTypes);
- if (rc != 0) {
- L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for
TopicReceiver %s/%s", receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
- }
hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static void * psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t
*msg_type_id_map) {
- void *msgTypeId = NULL;
- if(fqn != NULL && msg_type_id_map != NULL) {
- hash_map_iterator_t iter = hashMapIterator_construct(msg_type_id_map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
- pubsub_msg_serializer_t *serializer = hashMapEntry_getValue(entry);
- if(strcmp(serializer->msgName, fqn) == 0) {
- msgTypeId = hashMapEntry_getKey(entry);
- return msgTypeId;
- }
- }
+static inline void
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver,
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr,
const char* payload, size_t payloadSize) {
+ //NOTE receiver->subscribers.mutex locked
+
+ int64_t msgTypeId =
pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin,
receiver->serType, hdr->id);
+
+ if(msgTypeId < 0) {
+ L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with
serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ return;
}
- return msgTypeId;
-}
+ psa_websocket_serializer_entry_t *serializer =
pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin,
receiver->serType, msgTypeId);
-static inline void
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver,
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr,
const char* payload, size_t payloadSize) {
- //NOTE receiver->subscribers.mutex locked
- void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id,
entry->msgTypes);
- pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
-
- if (msgSer!= NULL && msgTypeId != 0) {
- void *deSerializedMsg = NULL;
- bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion,
hdr);
- if (validVersion) {
- struct iovec deSerializeBuffer;
- deSerializeBuffer.iov_base = (void *)payload;
- deSerializeBuffer.iov_len = payloadSize;
- celix_status_t status = msgSer->deserialize(msgSer->handle,
&deSerializeBuffer, 0, &deSerializedMsg);
-
- if (status == CELIX_SUCCESS) {
- hash_map_iterator_t iter =
hashMapIterator_construct(entry->subscriberServices);
- bool release = true;
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_subscriber_t *svc =
hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, NULL, &release);
- if (!release && hashMapIterator_hasNext(&iter)) {
- //receive function has taken ownership and still more
receive function to come ..
- //deserialize again for new message
- status = msgSer->deserialize(msgSer->handle,
&deSerializeBuffer, 0, &deSerializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg
type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ?
"(null)" : receiver->scope, receiver->topic);
- break;
- }
- release = true;
+ if(serializer == NULL) {
+ pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
+ L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with
serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ return;
+ }
+
+ void *deSerializedMsg = NULL;
+
+ celix_version_t* version =
celix_version_createVersionFromString(serializer->version);
+ bool validVersion = psa_websocket_checkVersion(version, hdr);
+ celix_version_destroy(version);
+ if (validVersion) {
+ struct iovec deSerializeBuffer;
+ deSerializeBuffer.iov_base = (void *)payload;
+ deSerializeBuffer.iov_len = payloadSize;
+ celix_status_t status =
serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0,
&deSerializedMsg);
+
+ if (status == CELIX_SUCCESS) {
+ hash_map_iterator_t iter =
hashMapIterator_construct(entry->subscriberServices);
+ bool release = true;
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, serializer->fqn, msgTypeId,
deSerializedMsg, NULL, &release);
+ if (!release && hashMapIterator_hasNext(&iter)) {
+ //receive function has taken ownership and still more
receive function to come ..
+ //deserialize again for new message
+ status =
serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0,
&deSerializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type
%s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)"
: receiver->scope, receiver->topic);
+ break;
}
+ release = true;
}
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle,
deSerializedMsg);
- }
- } else {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for
scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" :
receiver->scope, receiver->topic);
}
+ if (release) {
+ serializer->svc->freeDeserializedMsg(serializer->svc->handle,
deSerializedMsg);
+ }
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for
scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" :
receiver->scope, receiver->topic);
}
- } else {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X,
fqn %s", msgTypeId, hdr->id);
}
+
+ pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
}
static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver,
const char *msg, size_t msgSize) {
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
similarity index 92%
rename from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
rename to
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
index 4df42a0..55d5255 100644
---
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
+++
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
@@ -30,15 +30,15 @@ pubsub_websocket_topic_receiver_t*
pubsub_websocketTopicReceiver_create(celix_bu
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- long serializerSvcId,
- pubsub_serializer_service_t *serializer);
+ const char *serType,
+ void *admin);
void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t
*receiver);
const char*
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t
*receiver);
const char*
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t
*receiver);
const char*
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t *receiver);
+const char
*pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t
*sender);
-long
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t
*receiver);
void
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t
*unconnectedUrls);
void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort);
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
similarity index 77%
rename from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
rename to
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index ac3108c..98a1ad7 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -28,11 +28,11 @@
#include "pubsub_websocket_topic_sender.h"
#include "pubsub_psa_websocket_constants.h"
#include "pubsub_websocket_common.h"
-#include <uuid/uuid.h>
#include <jansson.h>
#include "celix_constants.h"
#include "http_admin/api.h"
#include "civetweb.h"
+#include "pubsub_websocket_admin.h"
#define FIRST_SEND_DELAY_IN_SECONDS 2
@@ -48,11 +48,11 @@
struct pubsub_websocket_topic_sender {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- long serializerSvcId;
- pubsub_serializer_service_t *serializer;
+ void *admin;
char *scope;
char *topic;
+ char *serType;
char scopeAndTopicFilter[5];
char *uri;
@@ -73,16 +73,13 @@ struct pubsub_websocket_topic_sender {
typedef struct psa_websocket_send_msg_entry {
pubsub_websocket_msg_header_t header; //partially filled header (only
seqnr and time needs to be updated per send)
- pubsub_msg_serializer_t *msgSer;
- celix_thread_mutex_t sendLock; //protects send & header(.seqNr)
+ uint32_t type; //msg type id (hash of fqn)
} psa_websocket_send_msg_entry_t;
typedef struct psa_websocket_bounded_service_entry {
pubsub_websocket_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
- hash_map_t *msgTypeIds; //key = msg name, value = msg type id
hash_map_t *msgEntries; //key = msg type id, value =
psa_websocket_send_msg_entry_t
int getCount;
} psa_websocket_bounded_service_entry_t;
@@ -102,15 +99,22 @@ pubsub_websocket_topic_sender_t*
pubsub_websocketTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- long serializerSvcId,
- pubsub_serializer_service_t *ser) {
+ const char *serType,
+ void *admin) {
pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerSvcId = serializerSvcId;
- sender->serializer = ser;
+ sender->serType = celix_utils_strdup(serType);
+
+ if(sender->serType == NULL) {
+ L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType");
+ free(sender);
+ return NULL;
+ }
+
psa_websocket_setScopeAndTopicFilter(scope, topic,
sender->scopeAndTopicFilter);
sender->uri = psa_websocket_createURI(scope, topic);
+ sender->admin = admin;
if (sender->uri != NULL) {
celix_properties_t *props = celix_properties_create();
@@ -171,8 +175,6 @@ void
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_bounded_service_entry_t *entry =
hashMapIterator_nextValue(&iter);
if (entry != NULL) {
-
sender->serializer->destroySerializerMap(sender->serializer->handle,
entry->msgTypes);
-
hash_map_iterator_t iter2 =
hashMapIterator_construct(entry->msgEntries);
while (hashMapIterator_hasNext(&iter2)) {
psa_websocket_send_msg_entry_t *msgEntry =
hashMapIterator_nextValue(&iter2);
@@ -196,14 +198,11 @@ void
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
}
free(sender->topic);
free(sender->uri);
+ free(sender->serType);
free(sender);
}
}
-long
pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t
*sender) {
- return sender->serializerSvcId;
-}
-
const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t
*sender) {
return sender->scope;
}
@@ -216,9 +215,16 @@ const char*
pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
return sender->uri;
}
+const char*
pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t
*sender) {
+ return sender->serType;
+}
+
static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char*
msgType, unsigned int* msgTypeId) {
psa_websocket_bounded_service_entry_t *entry =
(psa_websocket_bounded_service_entry_t *) handle;
- *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds,
msgType);
+ int64_t rc =
pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin,
entry->parent->serType, msgType);
+ if(rc >= 0) {
+ *msgTypeId = (unsigned int)rc;
+ }
return 0;
}
@@ -227,7 +233,7 @@ static void* psa_websocket_getPublisherService(void
*handle, const celix_bundle_
long bndId = celix_bundle_getId(requestingBundle);
celixThreadMutex_lock(&sender->boundedServices.mutex);
- psa_websocket_bounded_service_entry_t *entry =
hashMap_get(sender->boundedServices.map, (void*)bndId);
+ psa_websocket_bounded_service_entry_t *entry =
hashMap_get(sender->boundedServices.map, (void *) bndId);
if (entry != NULL) {
entry->getCount += 1;
} else {
@@ -236,33 +242,10 @@ static void* psa_websocket_getPublisherService(void
*handle, const celix_bundle_
entry->parent = sender;
entry->bndId = bndId;
entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
- entry->msgTypeIds = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
-
- int rc =
sender->serializer->createSerializerMap(sender->serializer->handle,
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
- if (rc == 0) {
- hash_map_iterator_t iter =
hashMapIterator_construct(entry->msgTypes);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *hashMapEntry =
hashMapIterator_nextEntry(&iter);
- void *key = hashMapEntry_getKey(hashMapEntry);
- psa_websocket_send_msg_entry_t *sendEntry = calloc(1,
sizeof(*sendEntry));
- sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
- sendEntry->header.id = sendEntry->msgSer->msgName;
- int major;
- int minor;
- version_getMajor(sendEntry->msgSer->msgVersion, &major);
- version_getMinor(sendEntry->msgSer->msgVersion, &minor);
- sendEntry->header.major = (uint8_t)major;
- sendEntry->header.minor = (uint8_t)minor;
- hashMap_put(entry->msgEntries, key, sendEntry);
- hashMap_put(entry->msgTypeIds,
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t)
sendEntry->msgSer->msgId);
- }
- entry->service.handle = entry;
- entry->service.localMsgTypeIdForMsgType =
psa_websocket_localMsgTypeIdForMsgType;
- entry->service.send = psa_websocket_topicPublicationSend;
- hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
- } else {
- L_ERROR("Error creating serializer map for websocket TopicSender
%s/%s", sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- }
+ entry->service.handle = entry;
+ entry->service.localMsgTypeIdForMsgType =
psa_websocket_localMsgTypeIdForMsgType;
+ entry->service.send = psa_websocket_topicPublicationSend;
+ hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -281,10 +264,6 @@ static void psa_websocket_ungetPublisherService(void
*handle, const celix_bundle
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
- int rc =
sender->serializer->destroySerializerMap(sender->serializer->handle,
entry->msgTypes);
- if (rc != 0) {
- L_ERROR("Error destroying publisher service, serializer not
available / cannot get msg serializer map\n");
- }
hash_map_iterator_t iter =
hashMapIterator_construct(entry->msgEntries);
while (hashMapIterator_hasNext(&iter)) {
@@ -293,7 +272,6 @@ static void psa_websocket_ungetPublisherService(void
*handle, const celix_bundle
}
hashMap_destroy(entry->msgEntries, false, false);
- hashMap_destroy(entry->msgTypeIds, true, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -303,25 +281,44 @@ static int psa_websocket_topicPublicationSend(void*
handle, unsigned int msgType
int status = CELIX_SERVICE_EXCEPTION;
psa_websocket_bounded_service_entry_t *bound = handle;
pubsub_websocket_topic_sender_t *sender = bound->parent;
+
+ psa_websocket_serializer_entry_t *serializer =
pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin,
sender->serType, msgTypeId);
+
+ if(serializer == NULL) {
+ pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
+ L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with
serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId,
sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return CELIX_SERVICE_EXCEPTION;
+ }
+
psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries,
(void *) (uintptr_t) (msgTypeId));
- if (sender->sockConnection != NULL && entry != NULL) {
+ if(entry == NULL) {
+ entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t));
+ entry->type = msgTypeId;
+ entry->header.id = serializer->fqn;
+ celix_version_t* version =
celix_version_createVersionFromString(serializer->version);
+ entry->header.major = (uint8_t)celix_version_getMajor(version);
+ entry->header.minor = (uint8_t)celix_version_getMinor(version);
+ entry->header.seqNr = 0;
+ celix_version_destroy(version);
+ hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ }
+
+ if (sender->sockConnection != NULL) {
delay_first_send_for_late_joiners(sender);
size_t serializedOutputLen = 0;
struct iovec* serializedOutput = NULL;
- status = entry->msgSer->serialize(entry->msgSer->handle, inMsg,
&serializedOutput, &serializedOutputLen);
+ status = serializer->svc->serialize(serializer->svc->handle, inMsg,
&serializedOutput, &serializedOutputLen);
if (status == CELIX_SUCCESS /*ser ok*/) {
json_error_t jsError;
- unsigned char *hdrEncoded =
calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
-
- celixThreadMutex_lock(&entry->sendLock);
json_t *jsMsg = json_object();
json_object_set_new_nocheck(jsMsg, "id",
json_string(entry->header.id));
json_object_set_new_nocheck(jsMsg, "major",
json_integer(entry->header.major));
json_object_set_new_nocheck(jsMsg, "minor",
json_integer(entry->header.minor));
- json_object_set_new_nocheck(jsMsg, "seqNr",
json_integer(entry->header.seqNr++));
+ uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1,
__ATOMIC_RELAXED);
+ json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr));
json_t *jsData;
jsData = json_loadb((const char *)serializedOutput->iov_base,
serializedOutput->iov_len, 0, &jsError);
@@ -339,21 +336,19 @@ static int psa_websocket_topicPublicationSend(void*
handle, unsigned int msgType
} else {
L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket, serialized
data corrupt. Error(%d;%d;%d): %s", jsError.column, jsError.line,
jsError.position, jsError.text);
}
- celixThreadMutex_unlock(&entry->sendLock);
json_decref(jsMsg); //Decrease ref count means freeing the object
- free(hdrEncoded);
- entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
serializedOutput, serializedOutputLen);
+ serializer->svc->freeSerializedMsg(serializer->svc->handle,
serializedOutput, serializedOutputLen);
} else {
L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for
scope/topic %s/%s",
- entry->msgSer->msgName, sender->scope == NULL ? "(null)" :
sender->scope, sender->topic);
+ entry->header.id, sender->scope == NULL ? "(null)" :
sender->scope, sender->topic);
}
- } else if (entry == NULL){
- L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i
for scope/topic %s/%s", msgTypeId, sender->scope == NULL ? "(null)" :
sender->scope, sender->topic);
} else { // when (sender->sockConnection == NULL) we dont have a client,
but we do have a valid entry
status = CELIX_SUCCESS; // Not an error, just nothing to do
}
+ pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
+
return status;
}
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
similarity index 90%
rename from
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
rename to
bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
index 7a839d1..8f8cebf 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h
+++
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
@@ -30,14 +30,13 @@ pubsub_websocket_topic_sender_t*
pubsub_websocketTopicSender_create(
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- long serializerSvcId,
- pubsub_serializer_service_t *ser);
+ const char *serType,
+ void *admin);
void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t
*sender);
const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t
*sender);
const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t
*sender);
const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t
*sender);
-
-long
pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t
*sender);
+const char
*pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t
*sender);
#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H
diff --git a/bundles/pubsub/test/CMakeLists.txt
b/bundles/pubsub/test/CMakeLists.txt
index 7be3658..fd16f9e 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -450,18 +450,18 @@ if (BUILD_PUBSUB_PSA_WS)
LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
DIR ${CMAKE_CURRENT_BINARY_DIR}
PROPERTIES
- LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
- CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
- USE_WEBSOCKETS=true
- LISTENING_PORTS=8080
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+ USE_WEBSOCKETS=true
+ LISTENING_PORTS=8080
BUNDLES
- Celix::pubsub_serializer_json
- Celix::http_admin
- Celix::pubsub_topology_manager
- Celix::pubsub_admin_websocket
- pubsub_sut
- pubsub_tst
- )
+ Celix::pubsub_serializer_json
+ Celix::http_admin
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_websocket
+ pubsub_sut
+ pubsub_tst
+ )
target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api
${CppUTest_LIBRARIES} Jansson Celix::dfi civetweb_shared)
target_include_directories(pubsub_websocket_tests SYSTEM PRIVATE
${CppUTest_INCLUDE_DIR} test)
add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>)
@@ -495,6 +495,58 @@ if (BUILD_PUBSUB_PSA_WS)
add_test(NAME pstm_deadlock_websocket_test COMMAND
pstm_deadlock_websocket_test WORKING_DIRECTORY
$<TARGET_PROPERTY:pstm_deadlock_websocket_test,CONTAINER_LOC>)
setup_target_for_coverage(pstm_deadlock_websocket_test SCAN_DIR ..)
+
+ add_celix_container(pubsub_websocket_v2_tests
+ USE_CONFIG
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+ USE_WEBSOCKETS=true
+ LISTENING_PORTS=8080
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::http_admin
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_websocket_v2
+ pubsub_sut
+ pubsub_tst
+ pubsub_serializer
+ )
+ target_link_libraries(pubsub_websocket_v2_tests PRIVATE Celix::pubsub_api
${CppUTest_LIBRARIES} Jansson civetweb_shared)
+ target_include_directories(pubsub_websocket_v2_tests SYSTEM PRIVATE
${CppUTest_INCLUDE_DIR} test)
+ add_test(NAME pubsub_websocket_v2_tests COMMAND pubsub_websocket_v2_tests
WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_v2_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_websocket_v2_tests SCAN_DIR ..)
+
+ add_celix_container(pstm_deadlock_websocket_v2_test
+ USE_CONFIG #ensures that a config.properties will be created with
the launch bundles.
+ LAUNCHER_SRC
${CMAKE_CURRENT_LIST_DIR}/pstm_deadlock_test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+ BUNDLES
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_websocket_v2
+ Celix::shell
+ Celix::shell_tui
+ pubsub_serializer
+ )
+ target_compile_definitions(pstm_deadlock_websocket_v2_test PRIVATE
-DDEADLOCK_SUT_BUNDLE_FILE=\"${DEADLOCK_SUT_BUNDLE_FILE}\")
+ target_link_libraries(pstm_deadlock_websocket_v2_test PRIVATE
Celix::pubsub_api GTest::gtest GTest::gtest_main Jansson Celix::dfi
civetweb_shared)
+ target_include_directories(pstm_deadlock_websocket_v2_test SYSTEM PRIVATE
pstm_deadlock_websocket_v2_test)
+
+ #Note we do not link to bundles, as result (to ensure a bundle zip file is
created) an dependency on the bundle is needed.
+ add_dependencies(pstm_deadlock_websocket_v2_test
pubsub_deadlock_sut_bundle)
+
+ #Framework "bundle" has no cache dir. Default as "cache dir" the cwd is
used.
+ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/msg.descriptor
${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_test/META-INF/descriptors/msg.descriptor
COPYONLY)
+
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/meta_data/deadlock.scope.properties
${CMAKE_CURRENT_BINARY_DIR}/pstm_deadlock_websocket_v2_test/META-INF/topics/pub/deadlock.properties
COPYONLY)
+
+ add_test(NAME pstm_deadlock_websocket_v2_test COMMAND
pstm_deadlock_websocket_v2_test WORKING_DIRECTORY
$<TARGET_PROPERTY:pstm_deadlock_websocket_v2_test,CONTAINER_LOC>)
+ setup_target_for_coverage(pstm_deadlock_websocket_v2_test SCAN_DIR ..)
endif()
if (BUILD_PUBSUB_PSA_ZMQ)