This is an automated email from the ASF dual-hosted git repository. rlenferink pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push: new de7c3a1 Added pubsub admin websocket new 41d1ba9 Merge pull request #39 from dhbfischer/feature/pubsub_admin_websocket de7c3a1 is described below commit de7c3a19359a9908c56801dd081630400729f288 Author: dfischer <m...@daanfischer.nl> AuthorDate: Tue Aug 6 17:26:56 2019 +0200 Added pubsub admin websocket --- bundles/pubsub/CMakeLists.txt | 1 + .../pubsub/pubsub_admin_websocket/CMakeLists.txt | 53 ++ .../pubsub_admin_websocket/src/psa_activator.c | 128 ++++ .../src/pubsub_psa_websocket_constants.h | 48 ++ .../src/pubsub_websocket_admin.c | 630 +++++++++++++++++ .../src/pubsub_websocket_admin.h | 54 ++ .../src/pubsub_websocket_common.c | 73 ++ .../src/pubsub_websocket_common.h | 55 ++ .../src/pubsub_websocket_topic_receiver.c | 787 +++++++++++++++++++++ .../src/pubsub_websocket_topic_receiver.h | 50 ++ .../src/pubsub_websocket_topic_sender.c | 477 +++++++++++++ .../src/pubsub_websocket_topic_sender.h | 48 ++ bundles/pubsub/test/CMakeLists.txt | 21 + bundles/pubsub/test/meta_data/ping.properties | 1 + 14 files changed, 2426 insertions(+) diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt index 0373347..a084aa5 100644 --- a/bundles/pubsub/CMakeLists.txt +++ b/bundles/pubsub/CMakeLists.txt @@ -34,6 +34,7 @@ if (PUBSUB) add_subdirectory(pubsub_admin_tcp) add_subdirectory(pubsub_admin_nanomsg) add_subdirectory(pubsub_admin_udp_mc) + add_subdirectory(pubsub_admin_websocket) add_subdirectory(keygen) add_subdirectory(mock) diff --git a/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt new file mode 100644 index 0000000..c992d5e --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/CMakeLists.txt @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +find_package(Jansson REQUIRED) +find_package(UUID REQUIRED) + +if(NOT UUID_LIBRARY) + #i.e. not found for OSX + set(UUID_LIBRARY "") + set(UUID_INCLUDE_DIRS "") +endif() + +add_celix_bundle(celix_pubsub_admin_websocket + BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket" + VERSION "1.0.0" + GROUP "Celix/PubSub" + SOURCES + src/psa_activator.c + src/pubsub_websocket_admin.c + src/pubsub_websocket_topic_sender.c + src/pubsub_websocket_topic_receiver.c + src/pubsub_websocket_common.c +) + +set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN") +target_link_libraries(celix_pubsub_admin_websocket PRIVATE + Celix::pubsub_spi + Celix::framework Celix::dfi Celix::log_helper Celix::utils + Celix::http_admin_api +) +target_include_directories(celix_pubsub_admin_websocket PRIVATE + ${JANSSON_INCLUDE_DIR} + ${UUID_INCLUDE_DIRS} + src +) + +install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub) +target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api) +add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket) diff --git a/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c new file mode 100644 index 0000000..a1d36b0 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/psa_activator.c @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> + +#include "celix_api.h" +#include "pubsub_serializer.h" +#include "log_helper.h" + +#include "pubsub_admin.h" +#include "pubsub_admin_metrics.h" +#include "pubsub_websocket_admin.h" +#include "command.h" + +typedef struct psa_websocket_activator { + log_helper_t *logHelper; + + pubsub_websocket_admin_t *admin; + + long serializersTrackerId; + + pubsub_admin_service_t adminService; + long adminSvcId; + + pubsub_admin_metrics_service_t adminMetricsService; + long adminMetricsSvcId; + + command_service_t cmdSvc; + long cmdSvcId; +} psa_websocket_activator_t; + +int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t *ctx) { + act->adminSvcId = -1L; + act->cmdSvcId = -1L; + act->serializersTrackerId = -1L; + + logHelper_create(ctx, &act->logHelper); + logHelper_start(act->logHelper); + + act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper); + celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION; + + //track serializers (only json) + if (status == CELIX_SUCCESS) { + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; + opts.filter.filter = "(pubsub.serializer=json)"; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = act->admin; + opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc; + opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc; + act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + //register pubsub admin service + if (status == CELIX_SUCCESS) { + pubsub_admin_service_t *psaSvc = &act->adminService; + psaSvc->handle = act->admin; + psaSvc->matchPublisher = pubsub_websocketAdmin_matchPublisher; + psaSvc->matchSubscriber = pubsub_websocketAdmin_matchSubscriber; + psaSvc->matchDiscoveredEndpoint = pubsub_websocketAdmin_matchDiscoveredEndpoint; + psaSvc->setupTopicSender = pubsub_websocketAdmin_setupTopicSender; + psaSvc->teardownTopicSender = pubsub_websocketAdmin_teardownTopicSender; + psaSvc->setupTopicReceiver = pubsub_websocketAdmin_setupTopicReceiver; + psaSvc->teardownTopicReceiver = pubsub_websocketAdmin_teardownTopicReceiver; + psaSvc->addDiscoveredEndpoint = pubsub_websocketAdmin_addDiscoveredEndpoint; + psaSvc->removeDiscoveredEndpoint = pubsub_websocketAdmin_removeDiscoveredEndpoint; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_WEBSOCKET_ADMIN_TYPE); + + act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props); + } + + if (status == CELIX_SUCCESS) { + act->adminMetricsService.handle = act->admin; + act->adminMetricsService.metrics = pubsub_websocketAdmin_metrics; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_WEBSOCKET_ADMIN_TYPE); + + act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props); + } + + //register shell command service + { + act->cmdSvc.handle = act->admin; + act->cmdSvc.executeCommand = pubsub_websocketAdmin_executeCommand; + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_websocket"); + celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_websocket"); + celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the websocket PSA"); + act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props); + } + + return status; +} + +int psa_websocket_stop(psa_websocket_activator_t *act, celix_bundle_context_t *ctx) { + celix_bundleContext_unregisterService(ctx, act->adminSvcId); + celix_bundleContext_unregisterService(ctx, act->cmdSvcId); + celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId); + celix_bundleContext_stopTracker(ctx, act->serializersTrackerId); + pubsub_websocketAdmin_destroy(act->admin); + + logHelper_stop(act->logHelper); + logHelper_destroy(&act->logHelper); + + return CELIX_SUCCESS; +} + +CELIX_GEN_BUNDLE_ACTIVATOR(psa_websocket_activator_t, psa_websocket_start, psa_websocket_stop); diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h new file mode 100644 index 0000000..c6fba82 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_psa_websocket_constants.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_ +#define PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_ + +#define PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE 30 +#define PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE 70 +#define PSA_WEBSOCKET_DEFAULT_SCORE 30 + +#define PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY "PSA_WEBSOCKET_QOS_SAMPLE_SCORE" +#define PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY "PSA_WEBSOCKET_QOS_CONTROL_SCORE" +#define PSA_WEBSOCKET_DEFAULT_SCORE_KEY "PSA_WEBSOCKET_DEFAULT_SCORE" + + +#define PSA_WEBSOCKET_METRICS_ENABLED "PSA_WEBSOCKET_METRICS_ENABLED" +#define PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED true + +#define PUBSUB_WEBSOCKET_VERBOSE_KEY "PSA_WEBSOCKET_VERBOSE" +#define PUBSUB_WEBSOCKET_VERBOSE_DEFAULT true + +#define PUBSUB_WEBSOCKET_ADMIN_TYPE "websocket" +#define PUBSUB_WEBSOCKET_ADDRESS_KEY "websocket.socket_address" +#define PUBSUB_WEBSOCKET_PORT_KEY "websocket.socket_port" + +/** + * The static url which a subscriber should try to connect to. + * The urls are space separated + */ +#define PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES "websocket.static.connect.socket_addresses" + +#endif /* PUBSUB_PSA_WEBSOCKET_CONSTANTS_H_ */ diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c new file mode 100644 index 0000000..a7fbe29 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c @@ -0,0 +1,630 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <memory.h> +#include <pubsub_endpoint.h> +#include <pubsub_serializer.h> +#include <ip_utils.h> + +#include "pubsub_utils.h" +#include "pubsub_websocket_admin.h" +#include "pubsub_psa_websocket_constants.h" +#include "pubsub_websocket_topic_sender.h" +#include "pubsub_websocket_topic_receiver.h" +#include "pubsub_websocket_common.h" + +#define L_DEBUG(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) +#define L_INFO(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__) +#define L_WARN(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) +#define L_ERROR(...) \ + logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) + +struct pubsub_websocket_admin { + celix_bundle_context_t *ctx; + log_helper_t *log; + const char *fwUUID; + + double qosSampleScore; + double qosControlScore; + double defaultScore; + + bool verbose; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t* + } serializers; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t* + } topicSenders; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t* + } topicReceivers; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint) + } discoveredEndpoints; + +}; + +typedef struct psa_websocket_serializer_entry { + const char *serType; + long svcId; + pubsub_serializer_service_t *svc; +} psa_websocket_serializer_entry_t; + +static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint); +static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint); + + +pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper) { + pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa)); + psa->ctx = ctx; + psa->log = logHelper; + psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_WEBSOCKET_VERBOSE_KEY, PUBSUB_WEBSOCKET_VERBOSE_DEFAULT); + psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); + + psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_DEFAULT_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_SCORE); + psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE); + psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE); + + celixThreadMutex_create(&psa->serializers.mutex, NULL); + psa->serializers.map = hashMap_create(NULL, NULL, NULL, NULL); + + celixThreadMutex_create(&psa->topicSenders.mutex, NULL); + psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + celixThreadMutex_create(&psa->topicReceivers.mutex, NULL); + psa->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL); + psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + return psa; +} + +void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) { + if (psa == NULL) { + return; + } + + //note assuming al psa register services and service tracker are removed. + + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter); + pubsub_websocketTopicSender_destroy(sender); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + + celixThreadMutex_lock(&psa->topicReceivers.mutex); + iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_receiver_t *recv = hashMapIterator_nextValue(&iter); + pubsub_websocketTopicReceiver_destroy(recv); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + iter = hashMapIterator_construct(psa->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + celix_properties_t *ep = hashMapIterator_nextValue(&iter); + celix_properties_destroy(ep); + } + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + + celixThreadMutex_lock(&psa->serializers.mutex); + iter = hashMapIterator_construct(psa->serializers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter); + free(entry); + } + celixThreadMutex_unlock(&psa->serializers.mutex); + + celixThreadMutex_destroy(&psa->topicSenders.mutex); + hashMap_destroy(psa->topicSenders.map, true, false); + + celixThreadMutex_destroy(&psa->topicReceivers.mutex); + hashMap_destroy(psa->topicReceivers.map, true, false); + + celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex); + hashMap_destroy(psa->discoveredEndpoints.map, false, false); + + celixThreadMutex_destroy(&psa->serializers.mutex); + hashMap_destroy(psa->serializers.map, false, false); + + free(psa); +} + +void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) { + pubsub_websocket_admin_t *psa = handle; + + const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, NULL); + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); + + if (serType == NULL) { + L_INFO("[PSA_WEBSOCKET] Ignoring serializer service without %s property", PUBSUB_SERIALIZER_TYPE_KEY); + return; + } + + celixThreadMutex_lock(&psa->serializers.mutex); + psa_websocket_serializer_entry_t *entry = hashMap_get(psa->serializers.map, (void*)svcId); + if (entry == NULL) { + entry = calloc(1, sizeof(*entry)); + entry->serType = serType; + entry->svcId = svcId; + entry->svc = svc; + hashMap_put(psa->serializers.map, (void*)svcId, entry); + } + celixThreadMutex_unlock(&psa->serializers.mutex); +} + +void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) { + pubsub_websocket_admin_t *psa = handle; + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); + + //remove serializer + // 1) First find entry and + // 2) loop and destroy all topic sender using the serializer and + // 3) loop and destroy all topic receivers using the serializer + // Note that it is the responsibility of the topology manager to create new topic senders/receivers + + celixThreadMutex_lock(&psa->serializers.mutex); + psa_websocket_serializer_entry_t *entry = hashMap_remove(psa->serializers.map, (void*)svcId); + celixThreadMutex_unlock(&psa->serializers.mutex); + + if (entry != NULL) { + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); + pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry); + if (sender != NULL && entry->svcId == pubsub_websocketTopicSender_serializerSvcId(sender)) { + char *key = hashMapEntry_getKey(senderEntry); + hashMapIterator_remove(&iter); + pubsub_websocketTopicSender_destroy(sender); + free(key); + } + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + + celixThreadMutex_lock(&psa->topicReceivers.mutex); + iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); + pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry); + if (receiver != NULL && entry->svcId == pubsub_websocketTopicReceiver_serializerSvcId(receiver)) { + char *key = hashMapEntry_getKey(senderEntry); + hashMapIterator_remove(&iter); + pubsub_websocketTopicReceiver_destroy(receiver); + free(key); + } + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + free(entry); + } +} + +celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) { + pubsub_websocket_admin_t *psa = handle; + L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchPublisher"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_WEBSOCKET_ADMIN_TYPE, + psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, + topicProperties, outSerializerSvcId); + *outScore = score; + + return status; +} + +celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) { + pubsub_websocket_admin_t *psa = handle; + L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchSubscriber"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_WEBSOCKET_ADMIN_TYPE, + psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, + topicProperties, outSerializerSvcId); + if (outScore != NULL) { + *outScore = score; + } + return status; +} + +celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) { + pubsub_websocket_admin_t *psa = handle; + L_DEBUG("[PSA_WEBSOCKET] pubsub_websocketAdmin_matchEndpoint"); + celix_status_t status = CELIX_SUCCESS; + bool match = pubsub_utils_matchEndpoint(psa->ctx, endpoint, PUBSUB_WEBSOCKET_ADMIN_TYPE, NULL); + if (outMatch != NULL) { + *outMatch = match; + } + return status; +} + +celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) { + pubsub_websocket_admin_t *psa = handle; + celix_status_t status = CELIX_SUCCESS; + + //1) Create TopicSender + //2) Store TopicSender + //3) Connect existing endpoints + //4) set outPublisherEndpoint + + celix_properties_t *newEndpoint = NULL; + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + + celixThreadMutex_lock(&psa->serializers.mutex); + celixThreadMutex_lock(&psa->topicSenders.mutex); + pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key); + if (sender == NULL) { + psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId); + if (serEntry != NULL) { + sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc); + } + if (sender != NULL) { + const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE; + const char *serType = serEntry->serType; + newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, + serType, NULL); + + //Set endpoint visibility to local because the http server handles discovery + celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY); + + //if available also set container name + const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL); + if (cn != NULL) { + celix_properties_set(newEndpoint, "container_name", cn); + } + hashMap_put(psa->topicSenders.map, key, sender); + } else { + L_ERROR("[PSA_WEBSOCKET] Error creating a TopicSender"); + free(key); + } + } else { + free(key); + L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + celixThreadMutex_unlock(&psa->serializers.mutex); + + if (newEndpoint != NULL && outPublisherEndpoint != NULL) { + *outPublisherEndpoint = newEndpoint; + } + + return status; +} + +celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic) { + pubsub_websocket_admin_t *psa = handle; + celix_status_t status = CELIX_SUCCESS; + + //1) Find and remove TopicSender from map + //2) destroy topic sender + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_entry_t *entry = hashMap_getEntry(psa->topicSenders.map, key); + if (entry != NULL) { + char *mapKey = hashMapEntry_getKey(entry); + pubsub_websocket_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key); + free(mapKey); + pubsub_websocketTopicSender_destroy(sender); + } else { + L_ERROR("[PSA_WEBSOCKET] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + free(key); + + return status; +} + +celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) { + pubsub_websocket_admin_t *psa = handle; + + celix_properties_t *newEndpoint = NULL; + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->serializers.mutex); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); + if (receiver == NULL) { + psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId); + if (serEntry != NULL) { + receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serializerSvcId, serEntry->svc); + } else { + L_ERROR("[PSA_WEBSOCKET] Cannot find serializer for TopicSender %s/%s", scope, topic); + } + if (receiver != NULL) { + const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE; + const char *serType = serEntry->serType; + newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, + PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL); + + //Set endpoint visibility to local because the http server handles discovery + celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY); + + //if available also set container name + const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL); + if (cn != NULL) { + celix_properties_set(newEndpoint, "container_name", cn); + } + hashMap_put(psa->topicReceivers.map, key, receiver); + } else { + L_ERROR("[PSA_WEBSOCKET] Error creating a TopicReceiver."); + free(key); + } + } else { + free(key); + L_ERROR("[PSA_WEBSOCKET] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + celixThreadMutex_unlock(&psa->serializers.mutex); + + if (receiver != NULL && newEndpoint != NULL) { + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); + if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } + } + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + } + + if (newEndpoint != NULL && outSubscriberEndpoint != NULL) { + *outSubscriberEndpoint = newEndpoint; + } + + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic) { + pubsub_websocket_admin_t *psa = handle; + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + hash_map_entry_t *entry = hashMap_getEntry(psa->topicReceivers.map, key); + free(key); + if (entry != NULL) { + char *receiverKey = hashMapEntry_getKey(entry); + pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(entry); + hashMap_remove(psa->topicReceivers.map, receiverKey); + + free(receiverKey); + pubsub_websocketTopicReceiver_destroy(receiver); + } + celixThreadMutex_lock(&psa->topicReceivers.mutex); + + celix_status_t status = CELIX_SUCCESS; + return status; +} + +static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint) { + //note can be called with discoveredEndpoint.mutex lock + celix_status_t status = CELIX_SUCCESS; + + const char *scope = pubsub_websocketTopicReceiver_scope(receiver); + const char *topic = pubsub_websocketTopicReceiver_topic(receiver); + + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); + const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + const char *sockAddress = celix_properties_get(endpoint, PUBSUB_WEBSOCKET_ADDRESS_KEY, NULL); + long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_WEBSOCKET_PORT_KEY, -1L); + + bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0; + + if (publisher && (sockAddress == NULL || sockPort < 0)) { + L_WARN("[PSA WEBSOCKET] Error got endpoint without websocket address/port or endpoint type. Properties:"); + const char *key = NULL; + CELIX_PROPERTIES_FOR_EACH(endpoint, key) { + L_WARN("[PSA WEBSOCKET] |- %s=%s\n", key, celix_properties_get(endpoint, key, NULL)); + } + status = CELIX_BUNDLE_EXCEPTION; + } else { + if (eScope != NULL && eTopic != NULL && + strncmp(eScope, scope, 1024 * 1024) == 0 && + strncmp(eTopic, topic, 1024 * 1024) == 0) { + char *uri = psa_websocket_createURI(eScope, eTopic); + pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress, sockPort, uri); + } + } + + return status; +} + +celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) { + pubsub_websocket_admin_t *psa = handle; + + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); + + if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + celixThreadMutex_lock(&psa->topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + } + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + celix_properties_t *cpy = celix_properties_copy(endpoint); + const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, NULL); + hashMap_put(psa->discoveredEndpoints.map, (void*)uuid, cpy); + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + + celix_status_t status = CELIX_SUCCESS; + return status; +} + + +static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint) { + //note can be called with discoveredEndpoint.mutex lock + celix_status_t status = CELIX_SUCCESS; + + const char *scope = pubsub_websocketTopicReceiver_scope(receiver); + const char *topic = pubsub_websocketTopicReceiver_topic(receiver); + + const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + + if (eScope != NULL && eTopic != NULL && + strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 1024 * 1024) == 0) { + char *uri = psa_websocket_createURI(eScope, eTopic); + pubsub_websocketTopicReceiver_disconnectFrom(receiver, uri); + + } + + return status; +} + +celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) { + pubsub_websocket_admin_t *psa = handle; + + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); + + if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + celixThreadMutex_lock(&psa->topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + pubsub_websocketAdmin_disconnectEndpointFromReceiver(psa, receiver, endpoint); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + } + + celixThreadMutex_lock(&psa->discoveredEndpoints.mutex); + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); + celix_properties_t *found = hashMap_remove(psa->discoveredEndpoints.map, (void*)uuid); + celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); + + if (found != NULL) { + celix_properties_destroy(found); + } + + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) { + pubsub_websocket_admin_t *psa = handle; + celix_status_t status = CELIX_SUCCESS; + + fprintf(out, "\n"); + fprintf(out, "Topic Senders:\n"); + celixThreadMutex_lock(&psa->serializers.mutex); + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter); + long serSvcId = pubsub_websocketTopicSender_serializerSvcId(sender); + psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId); + const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType; + const char *scope = pubsub_websocketTopicSender_scope(sender); + const char *topic = pubsub_websocketTopicSender_topic(sender); + const char *url = pubsub_websocketTopicSender_url(sender); +// const char *postUrl = pubsub_websocketTopicSender_isStatic(sender) ? " (static)" : ""; + fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); + fprintf(out, " |- serializer type = %s\n", serType); + fprintf(out, " |- url = %s\n", url); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + celixThreadMutex_unlock(&psa->serializers.mutex); + + fprintf(out, "\n"); + fprintf(out, "\nTopic Receivers:\n"); + celixThreadMutex_lock(&psa->serializers.mutex); + celixThreadMutex_lock(&psa->topicReceivers.mutex); + iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + long serSvcId = pubsub_websocketTopicReceiver_serializerSvcId(receiver); + psa_websocket_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serSvcId); + const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType; + const char *scope = pubsub_websocketTopicReceiver_scope(receiver); + const char *topic = pubsub_websocketTopicReceiver_topic(receiver); + + celix_array_list_t *connected = celix_arrayList_create(); + celix_array_list_t *unconnected = celix_arrayList_create(); + pubsub_websocketTopicReceiver_listConnections(receiver, connected, unconnected); + + fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic); + fprintf(out, " |- serializer type = %s\n", serType); + for (int i = 0; i < celix_arrayList_size(connected); ++i) { + char *url = celix_arrayList_get(connected, i); + fprintf(out, " |- connected url = %s\n", url); + free(url); + } + for (int i = 0; i < celix_arrayList_size(unconnected); ++i) { + char *url = celix_arrayList_get(unconnected, i); + fprintf(out, " |- unconnected url = %s\n", url); + free(url); + } + celix_arrayList_destroy(connected); + celix_arrayList_destroy(unconnected); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + celixThreadMutex_unlock(&psa->serializers.mutex); + fprintf(out, "\n"); + + return status; +} + +pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle) { + pubsub_websocket_admin_t *psa = handle; + pubsub_admin_metrics_t *result = calloc(1, sizeof(*result)); + snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_WEBSOCKET_ADMIN_TYPE); + result->senders = celix_arrayList_create(); + result->receivers = celix_arrayList_create(); + + celixThreadMutex_lock(&psa->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_sender_t *sender = hashMapIterator_nextValue(&iter); + pubsub_admin_sender_metrics_t *metrics = pubsub_websocketTopicSender_metrics(sender); + celix_arrayList_add(result->senders, metrics); + } + celixThreadMutex_unlock(&psa->topicSenders.mutex); + + celixThreadMutex_lock(&psa->topicReceivers.mutex); + iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + pubsub_admin_receiver_metrics_t *metrics = pubsub_websocketTopicReceiver_metrics(receiver); + celix_arrayList_add(result->receivers, metrics); + } + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + + return result; +} diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h new file mode 100644 index 0000000..62a14a9 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.h @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef CELIX_PUBSUB_WEBSOCKET_ADMIN_H +#define CELIX_PUBSUB_WEBSOCKET_ADMIN_H + +#include <pubsub_admin_metrics.h> +#include "celix_api.h" +#include "log_helper.h" +#include "pubsub_psa_websocket_constants.h" + +typedef struct pubsub_websocket_admin pubsub_websocket_admin_t; + +pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper); +void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa); + +celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId); +celix_status_t pubsub_websocketAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId); +celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match); + +celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint); +celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic); + +celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint); +celix_status_t pubsub_websocketAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic); + +celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint); +celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint); + +void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props); +void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props); + +celix_status_t pubsub_websocketAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream); + +pubsub_admin_metrics_t* pubsub_websocketAdmin_metrics(void *handle); + +#endif //CELIX_PUBSUB_WEBSOCKET_ADMIN_H + diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c new file mode 100644 index 0000000..ce85e40 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <memory.h> +#include <assert.h> +#include <stdio.h> +#include "pubsub_websocket_common.h" + +int psa_websocket_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) { + *msgTypeId = utils_stringHash(msgType); + return 0; +} + +bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr) { + bool check=false; + int major=0,minor=0; + + if (hdr->major == 0 && hdr->minor == 0) { + //no check + return true; + } + + if (msgVersion!=NULL) { + version_getMajor(msgVersion,&major); + version_getMinor(msgVersion,&minor); + if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */ + check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ + } + } + + return check; +} + +void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter) { + for (int i = 0; i < 5; ++i) { + filter[i] = '\0'; + } + if (scope != NULL && strnlen(scope, 3) >= 2) { + filter[0] = scope[0]; + filter[1] = scope[1]; + } + if (topic != NULL && strnlen(topic, 3) >= 2) { + filter[2] = topic[0]; + filter[3] = topic[1]; + } +} + +char *psa_websocket_createURI(const char *scope, const char *topic) { + char *uri = NULL; + if(scope != NULL && topic != NULL) { + asprintf(&uri, "/pubsub/%s/%s", scope, topic); + } + else if(scope == NULL && topic != NULL) { + asprintf(&uri, "/pubsub/default/%s", topic); + } + return uri; +} diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h new file mode 100644 index 0000000..89ec65a --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef CELIX_PUBSUB_WEBSOCKET_COMMON_H +#define CELIX_PUBSUB_WEBSOCKET_COMMON_H + +#include <utils.h> +#include <stdint.h> + +#include "version.h" + + +struct pubsub_websocket_msg_header { + uint32_t type; //msg type id (hash of fqn) + uint8_t major; + uint8_t minor; + uint32_t seqNr; + unsigned char originUUID[16]; + uint64_t sendtimeSeconds; //seconds since epoch + uint64_t sendTimeNanoseconds; //ns since epoch +}; + +typedef struct pubsub_websocket_msg_header pubsub_websocket_msg_header_t; + +struct pubsub_websocket_msg { + pubsub_websocket_msg_header_t header; + unsigned int payloadSize; + char payload[]; +}; + +typedef struct pubsub_websocket_msg pubsub_websocket_msg_t; + +int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); +void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter); +char *psa_websocket_createURI(const char *scope, const char *topic); + +bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr); + +#endif //CELIX_PUBSUB_WEBSOCKET_COMMON_H diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c new file mode 100644 index 0000000..4a41a0e --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c @@ -0,0 +1,787 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <pubsub_serializer.h> +#include <stdlib.h> +#include <pubsub/subscriber.h> +#include <memory.h> +#include <pubsub_constants.h> +#include <sys/epoll.h> +#include <assert.h> +#include <pubsub_endpoint.h> +#include <arpa/inet.h> +#include <log_helper.h> +#include <math.h> +#include "pubsub_websocket_topic_receiver.h" +#include "pubsub_psa_websocket_constants.h" +#include "pubsub_websocket_common.h" + +#include <uuid/uuid.h> +#include <pubsub_admin_metrics.h> +#include <http_admin/api.h> + +#ifndef UUID_STR_LEN +#define UUID_STR_LEN 37 +#endif + + +#define L_DEBUG(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) +#define L_INFO(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) +#define L_WARN(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) +#define L_ERROR(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) + +typedef struct pubsub_websocket_rcv_buffer { + celix_thread_mutex_t mutex; + celix_array_list_t *list; //List of received websocket messages (type: pubsub_websocket_msg_t *) + celix_array_list_t *rcvTimes; //Corresponding receive times of the received websocket messages (rcvTimes[i] -> list[i]) +} pubsub_websocket_rcv_buffer_t; + +struct pubsub_websocket_topic_receiver { + celix_bundle_context_t *ctx; + log_helper_t *logHelper; + long serializerSvcId; + pubsub_serializer_service_t *serializer; + char *scope; + char *topic; + char scopeAndTopicFilter[5]; + char *uri; + bool metricsEnabled; + + pubsub_websocket_rcv_buffer_t recvBuffer; + + struct { + celix_thread_t thread; + celix_thread_mutex_t mutex; + bool running; + } recvThread; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = url (host:port), value = psa_websocket_requested_connection_entry_t* + bool allConnected; //true if all requestedConnectection are connected + } requestedConnections; + + long subscriberTrackerId; + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = bnd id, value = psa_websocket_subscriber_entry_t + bool allInitialized; + } subscribers; +}; + +typedef struct psa_websocket_requested_connection_entry { + pubsub_websocket_rcv_buffer_t *recvBuffer; + char *key; //host:port + char *socketAddress; + long socketPort; + char *uri; + struct mg_connection *sockConnection; + int connectRetryCount; + bool connected; + bool statically; //true if the connection is statically configured through the topic properties. +} psa_websocket_requested_connection_entry_t; + +typedef struct psa_websocket_subscriber_metrics_entry_t { + unsigned int msgTypeId; + uuid_t origin; + + unsigned long nrOfMessagesReceived; + unsigned long nrOfSerializationErrors; + struct timespec lastMessageReceived; + double averageTimeBetweenMessagesInSeconds; + double averageSerializationTimeInSeconds; + double averageDelayInSeconds; + double maxDelayInSeconds; + double minDelayInSeconds; + unsigned int lastSeqNr; + unsigned long nrOfMissingSeqNumbers; +} psa_websocket_subscriber_metrics_entry_t; + +typedef struct psa_websocket_subscriber_entry { + int usageCount; + hash_map_t *msgTypes; //map from serializer svc + hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_websocket_subscriber_metrics_entry_t* + pubsub_subscriber_t *svc; + bool initialized; //true if the init function is called through the receive thread +} psa_websocket_subscriber_entry_t; + + +static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void* psa_websocket_recvThread(void * data); +static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver); +static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver); + +static int psa_websocketTopicReceiver_data(struct mg_connection *connection, int op_code, char *data, size_t length, void *handle); +static void psa_websocketTopicReceiver_close(const struct mg_connection *connection, void *handle); + + +pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx, + log_helper_t *logHelper, + const char *scope, + const char *topic, + const celix_properties_t *topicProperties, + long serializerSvcId, + pubsub_serializer_service_t *serializer) { + pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); + receiver->ctx = ctx; + receiver->logHelper = logHelper; + receiver->serializerSvcId = serializerSvcId; + receiver->serializer = serializer; + receiver->scope = strndup(scope, 1024 * 1024); + receiver->topic = strndup(topic, 1024 * 1024); + psa_websocket_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter); + receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED); + + receiver->uri = psa_websocket_createURI(scope, topic); + + if (receiver->uri != NULL) { + celixThreadMutex_create(&receiver->subscribers.mutex, NULL); + celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); + celixThreadMutex_create(&receiver->recvThread.mutex, NULL); + celixThreadMutex_create(&receiver->recvBuffer.mutex, NULL); + + receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); + receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + arrayList_create(&receiver->recvBuffer.list); + arrayList_create(&receiver->recvBuffer.rcvTimes); + } + + //track subscribers + if (receiver->uri != NULL) { + int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + char buf[size+1]; + snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.ignoreServiceLanguage = true; + opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; + opts.filter.filter = buf; + opts.callbackHandle = receiver; + opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber; + opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber; + + receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } + + + const char *staticConnects = celix_properties_get(topicProperties, PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES, NULL); + if (staticConnects != NULL) { + char *copy = strndup(staticConnects, 1024*1024); + char* addr; + char* save = copy; + + while ((addr = strtok_r(save, " ", &save))) { + char *colon = strchr(addr, ':'); + if (colon == NULL) { + continue; + } + + char *sockAddr = NULL; + asprintf(&sockAddr, "%.*s", (int)(colon - addr), addr); + + long sockPort = atol((colon + 1)); + + char *key = NULL; + asprintf(&key, "%s:%li", sockAddr, sockPort); + + + if (sockPort > 0) { + psa_websocket_requested_connection_entry_t *entry = calloc(1, sizeof(*entry)); + entry->key = key; + entry->uri = strndup(receiver->uri, 1024 * 1024); + entry->socketAddress = sockAddr; + entry->socketPort = sockPort; + entry->connected = false; + entry->statically = true; + entry->recvBuffer = &receiver->recvBuffer; + hashMap_put(receiver->requestedConnections.map, (void *) entry->key, entry); + } else { + L_WARN("[PSA_WEBSOCKET_TR] Invalid static socket address %s", addr); + free(key); + free(sockAddr); + } + } + free(copy); + } + + + if (receiver->uri != NULL) { + receiver->recvThread.running = true; + celixThread_create(&receiver->recvThread.thread, NULL, psa_websocket_recvThread, receiver); + char name[64]; + snprintf(name, 64, "WEBSOCKET TR %s/%s", scope, topic); + celixThread_setName(&receiver->recvThread.thread, name); + } + + if (receiver->uri == NULL) { + free(receiver->scope); + free(receiver->topic); + free(receiver); + receiver = NULL; + L_ERROR("[PSA_WEBSOCKET] Cannot create TopicReceiver for %s/%s", scope, topic); + } + + return receiver; +} + +void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver) { + if (receiver != NULL) { + + celixThreadMutex_lock(&receiver->recvThread.mutex); + receiver->recvThread.running = false; + celixThreadMutex_unlock(&receiver->recvThread.mutex); + celixThread_join(receiver->recvThread.thread, NULL); + + celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_t *origins = hashMapIterator_nextValue(&iter2); + hashMap_destroy(origins, true, true); + } + hashMap_destroy(entry->metrics, false, false); + + receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); + free(entry); + } + + } + hashMap_destroy(receiver->subscribers.map, false, false); + + + celixThreadMutex_unlock(&receiver->subscribers.mutex); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + iter = hashMapIterator_construct(receiver->requestedConnections.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + if(entry->connected) { + mg_close_connection(entry->sockConnection); + } + free(entry->uri); + free(entry->socketAddress); + free(entry->key); + free(entry); + } + } + hashMap_destroy(receiver->requestedConnections.map, false, false); + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + celixThreadMutex_destroy(&receiver->subscribers.mutex); + celixThreadMutex_destroy(&receiver->requestedConnections.mutex); + celixThreadMutex_destroy(&receiver->recvThread.mutex); + + celixThreadMutex_destroy(&receiver->recvBuffer.mutex); + int msgBufSize = celix_arrayList_size(receiver->recvBuffer.list); + while(msgBufSize > 0) { + pubsub_websocket_msg_t *msg = celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1); + free(msg); + msgBufSize--; + } + celix_arrayList_destroy(receiver->recvBuffer.list); + + int rcvTimesSize = celix_arrayList_size(receiver->recvBuffer.rcvTimes); + while(rcvTimesSize > 0) { + struct timespec *time = celix_arrayList_get(receiver->recvBuffer.rcvTimes, rcvTimesSize - 1); + free(time); + rcvTimesSize--; + } + celix_arrayList_destroy(receiver->recvBuffer.rcvTimes); + + free(receiver->uri); + free(receiver->scope); + free(receiver->topic); + } + free(receiver); +} + +const char* pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t *receiver) { + return receiver->scope; +} +const char* pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t *receiver) { + return receiver->topic; +} + +long pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t *receiver) { + return receiver->serializerSvcId; +} + +void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) { + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + char *url = NULL; + asprintf(&url, "%s%s", entry->uri, entry->statically ? " (static)" : ""); + if (entry->connected) { + celix_arrayList_add(connectedUrls, url); + } else { + celix_arrayList_add(unconnectedUrls, url); + } + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); +} + + +void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *receiver, const char *socketAddress, long socketPort, const char *uri) { + L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s connecting to websocket uri %s", receiver->scope, receiver->topic, uri); + + char *key = NULL; + asprintf(&key, "%s:%li", socketAddress, socketPort); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_websocket_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, key); + if (entry == NULL) { + entry = calloc(1, sizeof(*entry)); + entry->key = key; + entry->uri = strndup(uri, 1024 * 1024); + entry->socketAddress = strndup(socketAddress, 1024 * 1024); + entry->socketPort = socketPort; + entry->connected = false; + entry->statically = false; + entry->recvBuffer = &receiver->recvBuffer; + hashMap_put(receiver->requestedConnections.map, (void*)entry->uri, entry); + receiver->requestedConnections.allConnected = false; + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + psa_websocket_connectToAllRequestedConnections(receiver); +} + +void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t *receiver, const char *uri) { + L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s disconnect from websocket uri %s", receiver->scope, receiver->topic, uri); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_websocket_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, uri); + if (entry != NULL && entry->connected) { + mg_close_connection(entry->sockConnection); + L_WARN("[PSA_WEBSOCKET] Error disconnecting from websocket uri %s.", uri); + } + if (entry != NULL) { + free(entry->socketAddress); + free(entry->uri); + free(entry->key); + free(entry); + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); +} + +static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { + pubsub_websocket_topic_receiver_t *receiver = handle; + + long bndId = celix_bundle_getId(bnd); + const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { + //not the same scope. ignore + return; + } + + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); + if (entry != NULL) { + entry->usageCount += 1; + } else { + //new create entry + entry = calloc(1, sizeof(*entry)); + entry->usageCount = 1; + entry->svc = svc; + entry->initialized = false; + + int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes); + + if (rc == 0) { + entry->metrics = hashMap_create(NULL, NULL, NULL, NULL); + hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); + hash_map_t *origins = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + hashMap_put(entry->metrics, (void*)(uintptr_t)msgSer->msgId, origins); + } + } + + if (rc == 0) { + hashMap_put(receiver->subscribers.map, (void*)bndId, entry); + } else { + L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + free(entry); + } + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props __attribute__((unused)), const celix_bundle_t *bnd) { + pubsub_websocket_topic_receiver_t *receiver = handle; + + long bndId = celix_bundle_getId(bnd); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); + if (entry != NULL) { + entry->usageCount -= 1; + } + if (entry != NULL && entry->usageCount <= 0) { + //remove entry + hashMap_remove(receiver->subscribers.map, (void*)bndId); + int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); + if (rc != 0) { + L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + } + hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter)) { + hash_map_t *origins = hashMapIterator_nextValue(&iter); + hashMap_destroy(origins, true, true); + } + hashMap_destroy(entry->metrics, false, false); + free(entry); + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, const pubsub_websocket_msg_header_t *hdr, const void* payload, size_t payloadSize, struct timespec *receiveTime) { + //NOTE receiver->subscribers.mutex locked + pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type)); + pubsub_subscriber_t *svc = entry->svc; + bool monitor = receiver->metricsEnabled; + + //monitoring + struct timespec beginSer; + struct timespec endSer; + int updateReceiveCount = 0; + int updateSerError = 0; + + if (msgSer!= NULL) { + void *deserializedMsg = NULL; + bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion, hdr); + if (validVersion) { + if (monitor) { + clock_gettime(CLOCK_REALTIME, &beginSer); + } + celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg); + if (monitor) { + clock_gettime(CLOCK_REALTIME, &endSer); + } + if (status == CELIX_SUCCESS) { + bool release = true; + svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release); + if (release) { + msgSer->freeMsg(msgSer->handle, deserializedMsg); + } + updateReceiveCount += 1; + } else { + updateSerError += 1; + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic); + } + } + } else { + L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X", hdr->type); + } + + if (msgSer != NULL && monitor) { + hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )hdr->type); + char uuidStr[UUID_STR_LEN+1]; + uuid_unparse(hdr->originUUID, uuidStr); + psa_websocket_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr); + + if (metrics == NULL) { + metrics = calloc(1, sizeof(*metrics)); + hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics); + uuid_copy(metrics->origin, hdr->originUUID); + metrics->msgTypeId = hdr->type; + metrics->maxDelayInSeconds = -INFINITY; + metrics->minDelayInSeconds = INFINITY; + metrics->lastSeqNr = 0; + } + + double diff = celix_difftime(&beginSer, &endSer); + long n = metrics->nrOfMessagesReceived; + metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1); + + diff = celix_difftime(&metrics->lastMessageReceived, receiveTime); + n = metrics->nrOfMessagesReceived; + if (metrics->nrOfMessagesReceived >= 1) { + metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1); + } + metrics->lastMessageReceived = *receiveTime; + + + int incr = hdr->seqNr - metrics->lastSeqNr; + if (metrics->lastSeqNr >0 && incr > 1) { + metrics->nrOfMissingSeqNumbers += (incr - 1); + L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr); + } + metrics->lastSeqNr = hdr->seqNr; + + struct timespec sendTime; + sendTime.tv_sec = (time_t)hdr->sendtimeSeconds; + sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct + diff = celix_difftime(&sendTime, receiveTime); + metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1); + if (diff < metrics->minDelayInSeconds) { + metrics->minDelayInSeconds = diff; + } + if (diff > metrics->maxDelayInSeconds) { + metrics->maxDelayInSeconds = diff; + } + + metrics->nrOfMessagesReceived += updateReceiveCount; + metrics->nrOfSerializationErrors += updateSerError; + } +} + +static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const pubsub_websocket_msg_header_t *hdr, const char *payload, size_t payloadSize, struct timespec *receiveTime) { + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime); + } + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static void* psa_websocket_recvThread(void * data) { + pubsub_websocket_topic_receiver_t *receiver = data; + + celixThreadMutex_lock(&receiver->recvThread.mutex); + bool running = receiver->recvThread.running; + celixThreadMutex_unlock(&receiver->recvThread.mutex); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + bool allConnected = receiver->requestedConnections.allConnected; + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + bool allInitialized = receiver->subscribers.allInitialized; + celixThreadMutex_unlock(&receiver->subscribers.mutex); + + + while (running) { + if (!allConnected) { + psa_websocket_connectToAllRequestedConnections(receiver); + } + if (!allInitialized) { + psa_websocket_initializeAllSubscribers(receiver); + } + + while(celix_arrayList_size(receiver->recvBuffer.list) > 0) { + celixThreadMutex_lock(&receiver->recvBuffer.mutex); + pubsub_websocket_msg_t *msg = (pubsub_websocket_msg_t *) celix_arrayList_get(receiver->recvBuffer.list, 0); + struct timespec *rcvTime = (struct timespec *) celix_arrayList_get(receiver->recvBuffer.rcvTimes, 0); + celixThreadMutex_unlock(&receiver->recvBuffer.mutex); + + processMsg(receiver, &msg->header, msg->payload, msg->payloadSize, rcvTime); + free(msg); + free(rcvTime); + + celixThreadMutex_lock(&receiver->recvBuffer.mutex); + celix_arrayList_removeAt(receiver->recvBuffer.list, 0); + celix_arrayList_removeAt(receiver->recvBuffer.rcvTimes, 0); + celixThreadMutex_unlock(&receiver->recvBuffer.mutex); + } + + celixThreadMutex_lock(&receiver->recvThread.mutex); + running = receiver->recvThread.running; + celixThreadMutex_unlock(&receiver->recvThread.mutex); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + allConnected = receiver->requestedConnections.allConnected; + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + allInitialized = receiver->subscribers.allInitialized; + celixThreadMutex_unlock(&receiver->subscribers.mutex); + } // while + + return NULL; +} + +static int psa_websocketTopicReceiver_data(struct mg_connection *connection __attribute__((unused)), + int op_code __attribute__((unused)), + char *data, + size_t length, + void *handle) { + //Received a websocket message, append this message to the buffer of the receiver. + if (handle != NULL) { + psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle; + + pubsub_websocket_msg_t *rcvdMsg = malloc(length); + memcpy(rcvdMsg, data, length); + + //Check if payload is completely received + unsigned long rcvdPayloadSize = length - sizeof(rcvdMsg->header) - sizeof(rcvdMsg->payloadSize); + if(rcvdMsg->payloadSize == rcvdPayloadSize) { + celixThreadMutex_lock(&entry->recvBuffer->mutex); + celix_arrayList_add(entry->recvBuffer->list, rcvdMsg); + struct timespec *receiveTime = malloc(sizeof(*receiveTime)); + clock_gettime(CLOCK_REALTIME, receiveTime); + celix_arrayList_add(entry->recvBuffer->rcvTimes, receiveTime); + celixThreadMutex_unlock(&entry->recvBuffer->mutex); + } else { + free(rcvdMsg); + } + } + + return 1; //keep open (non-zero), 0 to close the socket +} + +static void psa_websocketTopicReceiver_close(const struct mg_connection *connection __attribute__((unused)), void *handle) { + //Reset connection for this receiver entry + if (handle != NULL) { + psa_websocket_requested_connection_entry_t *entry = (psa_websocket_requested_connection_entry_t *) handle; + entry->connected = false; + entry->sockConnection = NULL; + } +} + +pubsub_admin_receiver_metrics_t* pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t *receiver) { + pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); + snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); + + size_t msgTypesCount = 0; + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter2)) { + hashMapIterator_nextValue(&iter2); + msgTypesCount += 1; + } + } + + result->nrOfMsgTypes = (unsigned long)msgTypesCount; + result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes)); + int i = 0; + iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_t *origins = hashMapIterator_nextValue(&iter2); + result->msgTypes[i].origins = calloc((size_t)hashMap_size(origins), sizeof(*(result->msgTypes[i].origins))); + result->msgTypes[i].nrOfOrigins = hashMap_size(origins); + int k = 0; + hash_map_iterator_t iter3 = hashMapIterator_construct(origins); + while (hashMapIterator_hasNext(&iter3)) { + psa_websocket_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3); + result->msgTypes[i].typeId = metrics->msgTypeId; + pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void*)(uintptr_t)metrics->msgTypeId); + if (msgSer) { + snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName); + uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin); + result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived; + result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors; + result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds; + result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds; + result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds; + result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds; + result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds; + result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived; + result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers; + + k += 1; + } else { + L_WARN("[PSA_WEBSOCKET]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId); + } + } + i +=1 ; + } + } + + celixThreadMutex_unlock(&receiver->subscribers.mutex); + + return result; +} + + +static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver) { + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + if (!receiver->requestedConnections.allConnected) { + bool allConnected = true; + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (!entry->connected) { + char errBuf[100] = {0}; + entry->sockConnection = mg_connect_websocket_client(entry->socketAddress, + (int) entry->socketPort, + 0, // No ssl + errBuf, + (size_t) sizeof(errBuf), + entry->uri, + NULL, + psa_websocketTopicReceiver_data, + psa_websocketTopicReceiver_close, + entry); + if(entry->sockConnection != NULL) { + entry->connected = true; + entry->connectRetryCount = 0; + } else { + entry->connectRetryCount += 1; + allConnected = false; + if((entry->connectRetryCount % 10) == 0) { + L_WARN("[PSA_WEBSOCKET] Error connecting to websocket %s:%li/%s. Error: %s", + entry->socketAddress, + entry->socketPort, + entry->uri, errBuf); + } + } + } + } + receiver->requestedConnections.allConnected = allConnected; + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); +} + +static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver) { + celixThreadMutex_lock(&receiver->subscribers.mutex); + if (!receiver->subscribers.allInitialized) { + bool allInitialized = true; + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + if (!entry->initialized) { + int rc = 0; + if (entry->svc != NULL && entry->svc->init != NULL) { + rc = entry->svc->init(entry->svc->handle); + } + if (rc == 0) { + entry->initialized = true; + } else { + L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); + allInitialized = false; + } + } + } + receiver->subscribers.allInitialized = allInitialized; + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h new file mode 100644 index 0000000..1a00cfa --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H +#define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H + +#include <pubsub_admin_metrics.h> +#include "celix_bundle_context.h" + +typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t; + +pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bundle_context_t *ctx, + log_helper_t *logHelper, + const char *scope, + const char *topic, + const celix_properties_t *topicProperties, + long serializerSvcId, + pubsub_serializer_service_t *serializer); +void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver); + +const char* pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t *receiver); +const char* pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t *receiver); + +long pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t *receiver); +void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls); + +void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *receiver, const char *socketAddress, long socketPort, const char *uri); +void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t *receiver, const char *uri); + + +pubsub_admin_receiver_metrics_t* pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t *receiver); + + +#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c new file mode 100644 index 0000000..42f66c7 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <pubsub_serializer.h> +#include <stdlib.h> +#include <memory.h> +#include <pubsub_constants.h> +#include <pubsub/publisher.h> +#include <utils.h> +#include <zconf.h> +#include <log_helper.h> +#include "pubsub_websocket_topic_sender.h" +#include "pubsub_psa_websocket_constants.h" +#include "pubsub_websocket_common.h" +#include <uuid/uuid.h> +#include <constants.h> +#include "http_admin/api.h" +#include "civetweb.h" + +#define FIRST_SEND_DELAY_IN_SECONDS 2 + +#define L_DEBUG(...) \ + logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) +#define L_INFO(...) \ + logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) +#define L_WARN(...) \ + logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) +#define L_ERROR(...) \ + logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) + +struct pubsub_websocket_topic_sender { + celix_bundle_context_t *ctx; + log_helper_t *logHelper; + long serializerSvcId; + pubsub_serializer_service_t *serializer; + uuid_t fwUUID; + bool metricsEnabled; + + char *scope; + char *topic; + char scopeAndTopicFilter[5]; + char *uri; + + celix_websocket_service_t websockSvc; + long websockSvcId; + struct mg_connection *sockConnection; + + struct { + long svcId; + celix_service_factory_t factory; + } publisher; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = bndId, value = psa_websocket_bounded_service_entry_t + } boundedServices; +}; + +typedef struct psa_websocket_send_msg_entry { + pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send) + pubsub_msg_serializer_t *msgSer; + celix_thread_mutex_t sendLock; //protects send & Seqnr + unsigned int seqNr; + struct { + celix_thread_mutex_t mutex; //protects entries in struct + unsigned long nrOfMessagesSend; + unsigned long nrOfMessagesSendFailed; + unsigned long nrOfSerializationErrors; + struct timespec lastMessageSend; + double averageTimeBetweenMessagesInSeconds; + double averageSerializationTimeInSeconds; + } metrics; +} psa_websocket_send_msg_entry_t; + +typedef struct psa_websocket_bounded_service_entry { + pubsub_websocket_topic_sender_t *parent; + pubsub_publisher_t service; + long bndId; + hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t + hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t + int getCount; +} psa_websocket_bounded_service_entry_t; + + +static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties); +static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties); +static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender); + +static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg); + +static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle); +static void psa_websocketTopicSender_close(const struct mg_connection *connection, void *handle); + +pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create( + celix_bundle_context_t *ctx, + log_helper_t *logHelper, + const char *scope, + const char *topic, + long serializerSvcId, + pubsub_serializer_service_t *ser) { + pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender)); + sender->ctx = ctx; + sender->logHelper = logHelper; + sender->serializerSvcId = serializerSvcId; + sender->serializer = ser; + psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter); + const char* uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); + if (uuid != NULL) { + uuid_parse(uuid, sender->fwUUID); + } + sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED); + + sender->uri = psa_websocket_createURI(scope, topic); + + if (sender->uri != NULL) { + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, WEBSOCKET_ADMIN_URI, sender->uri); + + sender->websockSvc.handle = sender; + sender->websockSvc.ready = psa_websocketTopicSender_ready; + sender->websockSvc.close = psa_websocketTopicSender_close; + sender->websockSvcId = celix_bundleContext_registerService(ctx, &sender->websockSvc, + WEBSOCKET_ADMIN_SERVICE_NAME, props); + } else { + sender->websockSvcId = -1; + } + + if (sender->websockSvcId > 0) { + sender->scope = strndup(scope, 1024 * 1024); + sender->topic = strndup(topic, 1024 * 1024); + + celixThreadMutex_create(&sender->boundedServices.mutex, NULL); + sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); + } + + //register publisher services using a service factory + if (sender->websockSvcId > 0) { + sender->publisher.factory.handle = sender; + sender->publisher.factory.getService = psa_websocket_getPublisherService; + sender->publisher.factory.ungetService = psa_websocket_ungetPublisherService; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic); + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); + + celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + opts.factory = &sender->publisher.factory; + opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; + opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION; + opts.properties = props; + + sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts); + } + + if (sender->websockSvcId < 0) { + free(sender); + sender = NULL; + } + + return sender; +} + +void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender) { + if (sender != NULL) { + celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); + + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter2)) { + psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2); + celixThreadMutex_destroy(&msgEntry->metrics.mutex); + free(msgEntry); + + } + hashMap_destroy(entry->msgEntries, false, false); + + free(entry); + } + } + hashMap_destroy(sender->boundedServices.map, false, false); + celixThreadMutex_unlock(&sender->boundedServices.mutex); + + celixThreadMutex_destroy(&sender->boundedServices.mutex); + + celix_bundleContext_unregisterService(sender->ctx, sender->websockSvcId); + + free(sender->scope); + free(sender->topic); + free(sender->uri); + free(sender); + } +} + +long pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t *sender) { + return sender->serializerSvcId; +} + +const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t *sender) { + return sender->scope; +} + +const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t *sender) { + return sender->topic; +} + +const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sender) { + return sender->uri; +} + + +static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { + pubsub_websocket_topic_sender_t *sender = handle; + long bndId = celix_bundle_getId(requestingBundle); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + psa_websocket_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId); + if (entry != NULL) { + entry->getCount += 1; + } else { + entry = calloc(1, sizeof(*entry)); + entry->getCount = 1; + entry->parent = sender; + entry->bndId = bndId; + entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL); + + int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes); + if (rc == 0) { + hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter); + void *key = hashMapEntry_getKey(hashMapEntry); + psa_websocket_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry)); + sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry); + sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId; + int major; + int minor; + version_getMajor(sendEntry->msgSer->msgVersion, &major); + version_getMinor(sendEntry->msgSer->msgVersion, &minor); + sendEntry->header.major = (uint8_t)major; + sendEntry->header.minor = (uint8_t)minor; + uuid_copy(sendEntry->header.originUUID, sender->fwUUID); + celixThreadMutex_create(&sendEntry->metrics.mutex, NULL); + hashMap_put(entry->msgEntries, key, sendEntry); + } + entry->service.handle = entry; + entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType; + entry->service.send = psa_websocket_topicPublicationSend; + hashMap_put(sender->boundedServices.map, (void*)bndId, entry); + } else { + L_ERROR("Error creating serializer map for websocket TopicSender %s/%s", sender->scope, sender->topic); + } + } + celixThreadMutex_unlock(&sender->boundedServices.mutex); + + return &entry->service; +} + +static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { + pubsub_websocket_topic_sender_t *sender = handle; + long bndId = celix_bundle_getId(requestingBundle); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + psa_websocket_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void*)bndId); + if (entry != NULL) { + entry->getCount -= 1; + } + if (entry != NULL && entry->getCount == 0) { + //free entry + hashMap_remove(sender->boundedServices.map, (void*)bndId); + int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); + if (rc != 0) { + L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + } + + hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter); + celixThreadMutex_destroy(&msgEntry->metrics.mutex); + free(msgEntry); + } + hashMap_destroy(entry->msgEntries, false, false); + + free(entry); + } + celixThreadMutex_unlock(&sender->boundedServices.mutex); +} + +pubsub_admin_sender_metrics_t* pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender) { + pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); + snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); + celixThreadMutex_lock(&sender->boundedServices.mutex); + size_t count = 0; + hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter2)) { + hashMapIterator_nextValue(&iter2); + count += 1; + } + } + + result->msgMetrics = calloc(count, sizeof(*result)); + + iter = hashMapIterator_construct(sender->boundedServices.map); + int i = 0; + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter2)) { + psa_websocket_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2); + celixThreadMutex_lock(&mEntry->metrics.mutex); + result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend; + result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed; + result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors; + result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds; + result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds; + result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend; + result->msgMetrics[i].bndId = entry->bndId; + result->msgMetrics[i].typeId = mEntry->header.type; + snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName); + i += 1; + celixThreadMutex_unlock(&mEntry->metrics.mutex); + } + } + + celixThreadMutex_unlock(&sender->boundedServices.mutex); + result->nrOfmsgMetrics = (int)count; + return result; +} + +static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) { + int status = CELIX_SERVICE_EXCEPTION; + psa_websocket_bounded_service_entry_t *bound = handle; + pubsub_websocket_topic_sender_t *sender = bound->parent; + bool monitor = sender->metricsEnabled; + psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId)); + + //metrics updates + struct timespec sendTime; + struct timespec serializationStart; + struct timespec serializationEnd; + //int unknownMessageCountUpdate = 0; + int sendErrorUpdate = 0; + int serializationErrorUpdate = 0; + int sendCountUpdate = 0; + + if (sender->sockConnection != NULL && entry != NULL) { + delay_first_send_for_late_joiners(sender); + + if (monitor) { + clock_gettime(CLOCK_REALTIME, &serializationStart); + } + + void *serializedOutput = NULL; + size_t serializedOutputLen = 0; + status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen); + + if (monitor) { + clock_gettime(CLOCK_REALTIME, &serializationEnd); + } + + if (status == CELIX_SUCCESS /*ser ok*/) { + unsigned char *hdrEncoded = calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char)); + + celixThreadMutex_lock(&entry->sendLock); + + pubsub_websocket_msg_t *msg = malloc(sizeof(*msg) + sizeof(char[serializedOutputLen])); + pubsub_websocket_msg_header_t *msgHdr = &entry->header; + if (monitor) { + clock_gettime(CLOCK_REALTIME, &sendTime); + msgHdr->sendtimeSeconds = (uint64_t) sendTime.tv_sec; + msgHdr->sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec; + msgHdr->seqNr++; + } + memcpy(&msg->header, msgHdr, sizeof(pubsub_websocket_msg_header_t)); + + msg->payloadSize = (unsigned int) serializedOutputLen; + size_t hdr_size = sizeof(msg->header); + size_t ps_size = sizeof(msg->payloadSize); + size_t bytes_to_write = hdr_size + ps_size + serializedOutputLen;//sizeof(*msg); + memcpy(msg->payload, serializedOutput, serializedOutputLen); + int bytes_written = mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, (char *) msg, bytes_to_write); + + celixThreadMutex_unlock(&entry->sendLock); + if (bytes_written == (int) bytes_to_write) { + sendCountUpdate = 1; + } else { + sendErrorUpdate = 1; + L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket."); + } + + free(msg); + free(hdrEncoded); + free(serializedOutput); + } else { + serializationErrorUpdate = 1; + L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s", + entry->msgSer->msgName, sender->scope, sender->topic); + } + } else if (entry == NULL){ + //unknownMessageCountUpdate = 1; + L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic); + } + + + if (monitor && status == CELIX_SUCCESS) { + celixThreadMutex_lock(&entry->metrics.mutex); + + long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed; + double diff = celix_difftime(&serializationStart, &serializationEnd); + double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1); + entry->metrics.averageSerializationTimeInSeconds = average; + + if (entry->metrics.nrOfMessagesSend > 2) { + diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime); + n = entry->metrics.nrOfMessagesSend; + average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1); + entry->metrics.averageTimeBetweenMessagesInSeconds = average; + } + + entry->metrics.lastMessageSend = sendTime; + entry->metrics.nrOfMessagesSend += sendCountUpdate; + entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate; + entry->metrics.nrOfSerializationErrors += serializationErrorUpdate; + + celixThreadMutex_unlock(&entry->metrics.mutex); + } + + return status; +} + +static void psa_websocketTopicSender_ready(struct mg_connection *connection, void *handle) { + //Connection succeeded so save connection to use for sending the messages + pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t *) handle; + sender->sockConnection = connection; +} + +static void psa_websocketTopicSender_close(const struct mg_connection *connection __attribute__((unused)), void *handle) { + //Connection closed so reset connection + pubsub_websocket_topic_sender_t *sender = (pubsub_websocket_topic_sender_t *) handle; + sender->sockConnection = NULL; +} + +static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender) { + + static bool firstSend = true; + + if (firstSend) { + L_INFO("PSA_WEBSOCKET_TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY_IN_SECONDS); + firstSend = false; + } +} diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h new file mode 100644 index 0000000..35229a8 --- /dev/null +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H +#define CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H + +#include "celix_bundle_context.h" +#include "pubsub_admin_metrics.h" + +typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t; + +pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create( + celix_bundle_context_t *ctx, + log_helper_t *logHelper, + const char *scope, + const char *topic, + long serializerSvcId, + pubsub_serializer_service_t *ser); +void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender); + +const char* pubsub_websocketTopicSender_scope(pubsub_websocket_topic_sender_t *sender); +const char* pubsub_websocketTopicSender_topic(pubsub_websocket_topic_sender_t *sender); +const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sender); + +long pubsub_websocketTopicSender_serializerSvcId(pubsub_websocket_topic_sender_t *sender); + +/** + * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender. + */ +pubsub_admin_sender_metrics_t* pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender); + +#endif //CELIX_PUBSUB_WEBSOCKET_TOPIC_SENDER_H diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt index d37eb23..957745b 100644 --- a/bundles/pubsub/test/CMakeLists.txt +++ b/bundles/pubsub/test/CMakeLists.txt @@ -172,6 +172,27 @@ add_test(NAME pubsub_tcp_endpoint_tests COMMAND pubsub_tcp_endpoint_tests WORKIN SETUP_TARGET_FOR_COVERAGE(pubsub_tcp_endpoint_tests_cov pubsub_tcp_endpoint_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_tcp_endpoint_tests/pubsub_tcp_endpoint_tests ..) endif() +add_celix_container(pubsub_websocket_tests + USE_CONFIG + LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc + DIR ${CMAKE_CURRENT_BINARY_DIR} + PROPERTIES + LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true + USE_WEBSOCKETS=true + LISTENING_PORTS=8080 + BUNDLES + Celix::pubsub_serializer_json + Celix::http_admin + Celix::pubsub_topology_manager + Celix::pubsub_admin_websocket + pubsub_sut + pubsub_tst +) +target_link_libraries(pubsub_websocket_tests PRIVATE Celix::pubsub_api ${CPPUTEST_LIBRARIES} ${JANSSON_LIBRARIES} Celix::dfi) +target_include_directories(pubsub_websocket_tests PRIVATE ${CPPUTEST_INCLUDE_DIR}) +add_test(NAME pubsub_websocket_tests COMMAND pubsub_websocket_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_websocket_tests,CONTAINER_LOC>) +SETUP_TARGET_FOR_COVERAGE(pubsub_websocket_tests_cov pubsub_websocket_tests ${CMAKE_BINARY_DIR}/coverage/pubsub_websocket_tests/pubsub_websocket_tests ..) + if (BUILD_PUBSUB_PSA_ZMQ) add_celix_container(pubsub_zmq_tests USE_CONFIG #ensures that a config.properties will be created with the launch bundles. diff --git a/bundles/pubsub/test/meta_data/ping.properties b/bundles/pubsub/test/meta_data/ping.properties index b73435d..2e13c17 100644 --- a/bundles/pubsub/test/meta_data/ping.properties +++ b/bundles/pubsub/test/meta_data/ping.properties @@ -20,6 +20,7 @@ tcp.static.bind.url=tcp://127.0.0.1:9000 tcp.static.connect.urls=tcp://127.0.0.1:9000 udpmc.static.bind.port=50678 udpmc.static.connect.socket_addresses=224.100.0.1:50678 +websocket.static.connect.socket_addresses=127.0.0.1:8080 #note only effective if run as root thread.realtime.shed=SCHED_FIFO