Repository: celix Updated Branches: refs/heads/nanomsg cb740b0d4 -> 3009e6470
admin mutexes replaced by std::mutex and lock_guard Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95892a85 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95892a85 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95892a85 Branch: refs/heads/nanomsg Commit: 95892a8577107eb4fa93410ae08913d51407763b Parents: cb740b0 Author: Erjan Altena <erjanalt...@gmail.com> Authored: Thu Nov 1 21:17:02 2018 +0100 Committer: Erjan Altena <erjanalt...@gmail.com> Committed: Thu Nov 1 21:17:02 2018 +0100 ---------------------------------------------------------------------- .../src/psa_nanomsg_activator.cc | 4 - .../src/pubsub_nanomsg_admin.cc | 329 +++++++++---------- .../src/pubsub_nanomsg_admin.h | 10 +- 3 files changed, 166 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc index ec3ee7d..e599f01 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc @@ -87,10 +87,6 @@ private: LogHelper logHelper; pubsub_nanomsg_admin admin; - -// command_service_t cmdSvc{}; - -// long cmdSvcId = -1L; }; celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) { http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index bd1d0a5..c10431f 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -105,64 +105,59 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE); qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE); - celixThreadMutex_create(&serializers.mutex, nullptr); serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr); - celixThreadMutex_create(&topicSenders.mutex, nullptr); topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr); - celixThreadMutex_create(&topicReceivers.mutex, nullptr); topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr); - celixThreadMutex_create(&discoveredEndpoints.mutex, nullptr); discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr); } pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { //note assuming al psa register services and service tracker are removed. - - celixThreadMutex_lock(&topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter)); - pubsub_nanoMsgTopicSender_destroy(sender); + { + std::lock_guard<std::mutex> lock(topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter)); + pubsub_nanoMsgTopicSender_destroy(sender); + } } - celixThreadMutex_unlock(&topicSenders.mutex); - celixThreadMutex_lock(&topicReceivers.mutex); - iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); - pubsub_nanoMsgTopicReceiver_destroy(recv); + { + std::lock_guard<std::mutex> lock(topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); + pubsub_nanoMsgTopicReceiver_destroy(recv); + } } - celixThreadMutex_unlock(&topicReceivers.mutex); - celixThreadMutex_lock(&discoveredEndpoints.mutex); - iter = hashMapIterator_construct(discoveredEndpoints.map); - while (hashMapIterator_hasNext(&iter)) { - auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter)); - celix_properties_destroy(ep); + { + std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter)); + celix_properties_destroy(ep); + } } - celixThreadMutex_unlock(&discoveredEndpoints.mutex); - celixThreadMutex_lock(&serializers.mutex); - iter = hashMapIterator_construct(serializers.map); - while (hashMapIterator_hasNext(&iter)) { - auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter)); - free(entry); + { + std::lock_guard<std::mutex> lock(serializers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(serializers.map); + while (hashMapIterator_hasNext(&iter)) { + auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter)); + free(entry); + } } - celixThreadMutex_unlock(&serializers.mutex); - celixThreadMutex_destroy(&topicSenders.mutex); hashMap_destroy(topicSenders.map, true, false); - celixThreadMutex_destroy(&topicReceivers.mutex); hashMap_destroy(topicReceivers.map, true, false); - celixThreadMutex_destroy(&discoveredEndpoints.mutex); hashMap_destroy(discoveredEndpoints.map, false, false); - celixThreadMutex_destroy(&serializers.mutex); hashMap_destroy(serializers.map, false, false); free(ipAddress); @@ -259,16 +254,17 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t return; } - celixThreadMutex_lock(&serializers.mutex); - auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId)); - if (entry == nullptr) { - entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry))); - entry->serType = serType; - entry->svcId = svcId; - entry->svc = static_cast<pubsub_serializer_service_t*>(svc); - hashMap_put(serializers.map, (void*)svcId, entry); + { + std::lock_guard<std::mutex> lock(serializers.mutex); + auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)svcId)); + if (entry == nullptr) { + entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry))); + entry->serType = serType; + entry->svcId = svcId; + entry->svc = static_cast<pubsub_serializer_service_t*>(svc); + hashMap_put(serializers.map, (void*)svcId, entry); + } } - celixThreadMutex_unlock(&serializers.mutex); } @@ -281,40 +277,41 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper // 3) loop and destroy all topic receivers using the serializer // Note that it is the responsibility of the topology manager to create new topic senders/receivers - celixThreadMutex_lock(&serializers.mutex); + std::lock_guard<std::mutex> lock(serializers.mutex); auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map, (void*)svcId)); if (entry != nullptr) { - celixThreadMutex_lock(&topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry)); - if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) { - char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry)); - hashMapIterator_remove(&iter); - pubsub_nanoMsgTopicSender_destroy(sender); - free(key); + { + std::lock_guard<std::mutex> senderLock(topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); + auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry)); + if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) { + char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry)); + hashMapIterator_remove(&iter); + pubsub_nanoMsgTopicSender_destroy(sender); + free(key); + } } } - celixThreadMutex_unlock(&topicSenders.mutex); - celixThreadMutex_lock(&topicReceivers.mutex); - iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry)); - if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) { - char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry)); - hashMapIterator_remove(&iter); - pubsub_nanoMsgTopicReceiver_destroy(receiver); - free(key); + { + std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); + auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry)); + if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) { + char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry)); + hashMapIterator_remove(&iter); + pubsub_nanoMsgTopicReceiver_destroy(receiver); + free(key); + } } } - celixThreadMutex_unlock(&topicReceivers.mutex); free(entry); } - celixThreadMutex_unlock(&serializers.mutex); } celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, @@ -353,7 +350,7 @@ celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *end celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) { - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; //1) Create TopicSender //2) Store TopicSender @@ -363,21 +360,22 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c celix_properties_t *newEndpoint = nullptr; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - - celixThreadMutex_lock(&serializers.mutex); - celixThreadMutex_lock(&topicSenders.mutex); - auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(topicSenders.map, key)); + pubsub_nanomsg_topic_sender_t *sender = nullptr; + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); + sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key)); if (sender == nullptr) { - auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId)); + auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map, + (void *) serializerSvcId)); if (serEntry != nullptr) { - sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, - ipAddress, basePort, maxPort); + sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress, + basePort, maxPort); } if (sender != nullptr) { const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; const char *serType = serEntry->serType; - newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, - serType, nullptr); + newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, + nullptr); celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender)); //if available also set container name const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); @@ -393,9 +391,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c free(key); L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic); } - celixThreadMutex_unlock(&topicSenders.mutex); - celixThreadMutex_unlock(&serializers.mutex); - if (sender != nullptr && newEndpoint != nullptr) { //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender? } @@ -414,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons //2) destroy topic sender char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&topicSenders.mutex); + std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key); if (entry != nullptr) { char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry)); @@ -425,7 +420,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons } else { L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic); } - celixThreadMutex_unlock(&topicSenders.mutex); free(key); return status; @@ -437,41 +431,41 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const celix_properties_t *newEndpoint = nullptr; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&serializers.mutex); - celixThreadMutex_lock(&topicReceivers.mutex); - auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(topicReceivers.map, key)); - if (receiver == nullptr) { - auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serializerSvcId)); - if (serEntry != nullptr) { - receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, - serEntry->svc); - } else { - L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic); - } - if (receiver != nullptr) { - const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; - const char *serType = serEntry->serType; - newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, - PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr); - //if available also set container name - const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); - if (cn != nullptr) { - celix_properties_set(newEndpoint, "container_name", cn); + pubsub_nanomsg_topic_receiver_t * receiver = nullptr; + { + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); + receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key)); + if (receiver == nullptr) { + auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map, + (void *) serializerSvcId)); + if (serEntry != nullptr) { + receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc); + } else { + L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic); + } + if (receiver != nullptr) { + const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; + const char *serType = serEntry->serType; + newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, + serType, nullptr); + //if available also set container name + const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); + if (cn != nullptr) { + celix_properties_set(newEndpoint, "container_name", cn); + } + hashMap_put(topicReceivers.map, key, receiver); + } else { + L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver."); + free(key); } - hashMap_put(topicReceivers.map, key, receiver); } else { - L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver."); free(key); + L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic); } - } else { - free(key); - L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic); } - celixThreadMutex_unlock(&topicReceivers.mutex); - celixThreadMutex_unlock(&serializers.mutex); - if (receiver != nullptr && newEndpoint != nullptr) { - celixThreadMutex_lock(&discoveredEndpoints.mutex); + std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter)); @@ -480,7 +474,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const connectEndpointToReceiver(receiver, endpoint); } } - celixThreadMutex_unlock(&discoveredEndpoints.mutex); } if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) { @@ -493,7 +486,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) { char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - celixThreadMutex_lock(&topicReceivers.mutex); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key); free(key); if (entry != nullptr) { @@ -504,7 +497,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co free(receiverKey); pubsub_nanoMsgTopicReceiver_destroy(receiver); } - celixThreadMutex_lock(&topicReceivers.mutex); celix_status_t status = CELIX_SUCCESS; return status; @@ -542,20 +534,18 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - celixThreadMutex_lock(&topicReceivers.mutex); + std::lock_guard<std::mutex> threadLock(topicReceivers.mutex); hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); while (hashMapIterator_hasNext(&iter)) { pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); connectEndpointToReceiver(receiver, endpoint); } - celixThreadMutex_unlock(&topicReceivers.mutex); } - celixThreadMutex_lock(&discoveredEndpoints.mutex); + std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); celix_properties_t *cpy = celix_properties_copy(endpoint); const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr); hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy); - celixThreadMutex_unlock(&discoveredEndpoints.mutex); celix_status_t status = CELIX_SUCCESS; return status; @@ -592,20 +582,19 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - celixThreadMutex_lock(&topicReceivers.mutex); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); while (hashMapIterator_hasNext(&iter)) { pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); disconnectEndpointFromReceiver(receiver, endpoint); } - celixThreadMutex_unlock(&topicReceivers.mutex); } - - celixThreadMutex_lock(&discoveredEndpoints.mutex); - const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr); - celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid)); - celixThreadMutex_unlock(&discoveredEndpoints.mutex); - + celix_properties_t *found = nullptr; + { + std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr); + found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid)); + } if (found != nullptr) { celix_properties_destroy(found); } @@ -620,52 +609,56 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut fprintf(out, "\n"); fprintf(out, "Topic Senders:\n"); - celixThreadMutex_lock(&serializers.mutex); - celixThreadMutex_lock(&topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter)); - long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender); - psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId)); - const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType; - const char *scope = pubsub_nanoMsgTopicSender_scope(sender); - const char *topic = pubsub_nanoMsgTopicSender_topic(sender); - const char *url = pubsub_nanoMsgTopicSender_url(sender); - fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); - fprintf(out, " |- serializer type = %s\n", serType); - fprintf(out, " |- url = %s\n", url); - } - celixThreadMutex_unlock(&topicSenders.mutex); - celixThreadMutex_unlock(&serializers.mutex); - - fprintf(out, "\n"); - fprintf(out, "\nTopic Receivers:\n"); - celixThreadMutex_lock(&serializers.mutex); - celixThreadMutex_lock(&topicReceivers.mutex); - iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); - long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver); - psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map, (void*)serSvcId)); - const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType; - const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver); - const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver); - - std::vector<std::string> connected{}; - std::vector<std::string> unconnected{}; - pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected); - - fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic); - fprintf(out, " |- serializer type = %s\n", serType); - for (auto url : connected) { - fprintf(out, " |- connected url = %s\n", url.c_str()); + { + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue( + &iter)); + long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender); + psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get( + serializers.map, (void *) serSvcId)); + const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType; + const char *scope = pubsub_nanoMsgTopicSender_scope(sender); + const char *topic = pubsub_nanoMsgTopicSender_topic(sender); + const char *url = pubsub_nanoMsgTopicSender_url(sender); + fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); + fprintf(out, " |- serializer type = %s\n", serType); + fprintf(out, " |- url = %s\n", url); } - for (auto url : unconnected) { - fprintf(out, " |- unconnected url = %s\n", url.c_str()); + } + + { + fprintf(out, "\n"); + fprintf(out, "\nTopic Receivers:\n"); + std::lock_guard<std::mutex> serialerLock(serializers.mutex); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue( + &iter)); + long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver); + psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get( + serializers.map, (void *) serSvcId)); + const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType; + const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver); + const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver); + + std::vector<std::string> connected{}; + std::vector<std::string> unconnected{}; + pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected); + + fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic); + fprintf(out, " |- serializer type = %s\n", serType); + for (auto url : connected) { + fprintf(out, " |- connected url = %s\n", url.c_str()); + } + for (auto url : unconnected) { + fprintf(out, " |- unconnected url = %s\n", url.c_str()); + } } } - celixThreadMutex_unlock(&topicReceivers.mutex); - celixThreadMutex_unlock(&serializers.mutex); fprintf(out, "\n"); return status; http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 385b400..b06c887 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -20,6 +20,7 @@ #ifndef CELIX_PUBSUB_ZMQ_ADMIN_H #define CELIX_PUBSUB_ZMQ_ADMIN_H +#include <mutex> #include <pubsub_admin.h> #include "celix_api.h" #include "log_helper.h" @@ -98,22 +99,22 @@ private: bool verbose{}; struct { - celix_thread_mutex_t mutex; + std::mutex mutex; hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t* } serializers{}; struct { - celix_thread_mutex_t mutex; + std::mutex mutex; hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t* } topicSenders{}; struct { - celix_thread_mutex_t mutex; + std::mutex mutex; hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t* } topicReceivers{}; struct { - celix_thread_mutex_t mutex; + std::mutex mutex; hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint) } discoveredEndpoints{}; @@ -127,7 +128,6 @@ extern "C" { } #endif -celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream, FILE *errStream); #endif //CELIX_PUBSUB_ZMQ_ADMIN_H