subscriber.map now std::map
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/120895dd Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/120895dd Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/120895dd Branch: refs/heads/nanomsg Commit: 120895dd93c2c995d98327925fe88f939d345d12 Parents: 8658738 Author: Erjan Altena <erjanalt...@gmail.com> Authored: Fri Nov 23 21:53:50 2018 +0100 Committer: Erjan Altena <erjanalt...@gmail.com> Committed: Fri Nov 23 21:53:50 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_topic_receiver.cc | 100 +++++++------------ .../src/pubsub_nanomsg_topic_receiver.h | 12 ++- 2 files changed, 45 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 8acf6b1..2205ed2 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -63,13 +63,6 @@ #define L_ERROR printf - - - -//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, -// const celix_bundle_t *owner); - - pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope, @@ -103,8 +96,8 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, m_scope = strndup(m_scope, 1024 * 1024); m_topic = strndup(m_topic, 1024 * 1024); - subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); - std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n"; + //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); + //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n"; //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); @@ -139,38 +132,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() { celix_bundleContext_stopTracker(ctx, subscriberTrackerId); - hash_map_iterator_t iter=hash_map_iterator_t(); { std::lock_guard<std::mutex> _lock(subscribers.mutex); - iter = hashMapIterator_construct(subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter)); - if (entry != NULL) { - serializer->destroySerializerMap(serializer->handle, entry->msgTypes); - free(entry); - } + for(auto elem : subscribers.map) { + serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes); } - hashMap_destroy(subscribers.map, false, false); + subscribers.map.clear(); } - - -// { -// std::lock_guard<std::mutex> _lock(requestedConnections.mutex); -// iter = hashMapIterator_construct(requestedConnections.map); -// while (hashMapIterator_hasNext(&iter)) { -// psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter)); -// if (entry != NULL) { -// free(entry->url); -// free(entry); -// } -// } -// hashMap_destroy(requestedConnections.map, false, false); -// } - - //celixThreadMutex_destroy(&receiver->subscribers.mutex); - //celixThreadMutex_destroy(&receiver->requestedConnections.mutex); - //celixThreadMutex_destroy(&receiver->recvThread.mutex); - nn_close(m_nanoMsgSocket); free((void*)m_scope); @@ -211,6 +179,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { std::piecewise_construct, std::forward_as_tuple(std::string(url)), std::forward_as_tuple(url, -1)); + entry = requestedConnections.map.find(url); } if (!entry->second.isConnected()) { int connection_id = nn_connect(m_nanoMsgSocket, url); @@ -254,21 +223,26 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope } std::lock_guard<std::mutex> _lock(subscribers.mutex); - psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId)); - if (entry != NULL) { - entry->usageCount += 1; + auto entry = subscribers.map.find(bndId); + if (entry != subscribers.map.end()) { + entry->second.usageCount += 1; } else { //new create entry - entry = static_cast<psa_nanomsg_subscriber_entry_t*>(calloc(1, sizeof(*entry))); - entry->usageCount = 1; - entry->svc = static_cast<pubsub_subscriber_t*>(svc); + subscribers.map.emplace(std::piecewise_construct, + std::forward_as_tuple(bndId), + std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1)); + entry = subscribers.map.find(bndId); + if (entry == subscribers.map.end()) { + std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n"; + } - int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes); + int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes); if (rc == 0) { - hashMap_put(subscribers.map, (void*)bndId, entry); + //hashMap_put(subscribers.map, (void*)bndId, entry); + //subscribers.map[bndId] = *entry; } else { L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic); - free(entry); + subscribers.map.erase(bndId); } } } @@ -278,18 +252,17 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/, long bndId = celix_bundle_getId(bnd); std::lock_guard<std::mutex> _lock(subscribers.mutex); - psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId)); - if (entry != NULL) { - entry->usageCount -= 1; - } - if (entry != NULL && entry->usageCount <= 0) { - //remove entry - hashMap_remove(subscribers.map, (void*)bndId); - int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes); - if (rc != 0) { - L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic); + auto entry = subscribers.map.find(bndId); + if (entry != subscribers.map.end()) { + entry->second.usageCount -= 1; + if (entry->second.usageCount <= 0) { + //remove entry + int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); + if (rc != 0) { + L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic); + } + subscribers.map.erase(bndId); } - free(entry); } } @@ -319,12 +292,13 @@ void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_s void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) { std::lock_guard<std::mutex> _lock(subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter)); - if (entry != NULL) { - processMsgForSubscriberEntry(entry, hdr, payload, payloadSize); - } + //hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map); + //while (hashMapIterator_hasNext(&iter)) { + for (auto entry : subscribers.map) { + //psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter)); + //if (entry != NULL) { + processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize); + //} } } http://git-wip-us.apache.org/repos/asf/celix/blob/120895dd/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index 3398fb1..09d62a9 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -28,10 +28,13 @@ #include "pubsub_nanomsg_common.h" #include "pubsub/subscriber.h" -typedef struct psa_zmq_subscriber_entry { +typedef struct psa_nanomsg_subscriber_entry { + psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) : + svc{_svc}, usageCount{_usageCount} { + } + pubsub_subscriber_t *svc{}; int usageCount; - hash_map_t *msgTypes; //map from serializer svc - pubsub_subscriber_t *svc; + hash_map_t *msgTypes{nullptr}; //map from serializer svc } psa_nanomsg_subscriber_entry_t; typedef struct psa_zmq_requested_connection_entry { @@ -116,7 +119,8 @@ namespace pubsub { long subscriberTrackerId{0}; struct { std::mutex mutex; - hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t + std::map<long, psa_nanomsg_subscriber_entry_t> map; + //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t } subscribers{}; }; }