Nanomsg: moved charptr to std::string
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/883abeed Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/883abeed Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/883abeed Branch: refs/heads/nanomsg Commit: 883abeed00cfa3c1509b026fea888550b863defc Parents: 120895d Author: Erjan Altena <erjanalt...@gmail.com> Authored: Fri Nov 23 22:35:30 2018 +0100 Committer: Erjan Altena <erjanalt...@gmail.com> Committed: Fri Nov 23 22:35:30 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 56 +++++++----------- .../src/pubsub_nanomsg_admin.h | 4 +- .../src/pubsub_nanomsg_common.cc | 8 +-- .../src/pubsub_nanomsg_common.h | 3 +- .../src/pubsub_nanomsg_topic_receiver.cc | 60 +++++++------------- .../src/pubsub_nanomsg_topic_receiver.h | 26 ++++----- 6 files changed, 63 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 3e788ae..030441d 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -159,7 +159,7 @@ void pubsub_nanomsg_admin::start() { }; adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) { auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->setupTopicReceiver(scope, topic,serializerSvcId, subscriberEndpoint); + return me->setupTopicReceiver(std::string(scope), std::string(topic),serializerSvcId, subscriberEndpoint); }; adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) { @@ -205,7 +205,7 @@ void pubsub_nanomsg_admin::start() { celix_properties_t* shellProps = celix_properties_create(); celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg"); celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg"); - celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the ZMQ PSA"); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA"); cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps); } @@ -275,10 +275,9 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper for (auto kv : topicReceivers.map){ auto *receiver = kv.second; if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) { - char *key = kv.first; + auto key = kv.first; topicReceivers.map.erase(key); delete receiver; - free(key); } } } @@ -338,8 +337,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); sender = topicSenders.map.find(key)->second; if (sender == nullptr) { - //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map, - // (void *) serializerSvcId)); psa_nanomsg_serializer_entry_t *serEntry = nullptr; auto kv = serializers.map.find(serializerSvcId); if (kv != serializers.map.end()) { @@ -403,12 +400,12 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons return status; } -celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const char *topic, +celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) { celix_properties_t *newEndpoint = nullptr; - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + std::string key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str()); pubsub::nanomsg::topic_receiver * receiver = nullptr; { std::lock_guard<std::mutex> serializerLock(serializers.mutex); @@ -423,12 +420,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const auto serEntry = kvs->second; receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc); } else { - L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic); + L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str()); } if (receiver != nullptr) { const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; const char *serType = kvs->second->serType; - newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, + newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr); //if available also set container name const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); @@ -438,11 +435,9 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const topicReceivers.map[key] = receiver; } else { L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver."); - free(key); } } else { - free(key); - L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope, topic); + L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic %s/%s!", scope.c_str(), topic.c_str()); } } if (receiver != nullptr && newEndpoint != nullptr) { @@ -470,11 +465,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co auto entry = topicReceivers.map.find(key); free(key); if (entry != topicReceivers.map.end()) { - char *receiverKey = entry->first; + auto receiverKey = entry->first; pubsub::nanomsg::topic_receiver *receiver = entry->second; topicReceivers.map.erase(receiverKey); - free(receiverKey); delete receiver; } @@ -487,22 +481,18 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg:: //note can be called with discoveredEndpoint.mutex lock celix_status_t status = CELIX_SUCCESS; - const char *scope = receiver->scope(); - const char *topic = receiver->topic(); + auto scope = receiver->scope(); + auto topic = receiver->topic(); - const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr); - const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr); + std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, ""); + std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, ""); const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr); if (url == nullptr) { -// const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, nullptr); -// const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); // L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type); status = CELIX_BUNDLE_EXCEPTION; } else { - if (eScope != nullptr && eTopic != nullptr && - strncmp(eScope, scope, 1024 * 1024) == 0 && - strncmp(eTopic, topic, 1024 * 1024) == 0) { + if ((eScope == scope) && (eTopic == topic)) { receiver->connectTo(url); } } @@ -537,20 +527,18 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nano //note can be called with discoveredEndpoint.mutex lock celix_status_t status = CELIX_SUCCESS; - const char *scope = receiver->scope(); - const char *topic = receiver->topic(); + auto scope = receiver->scope(); + auto topic = receiver->topic(); - const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr); - const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr); + auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, ""); + auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, ""); const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr); if (url == nullptr) { L_WARN("[PSA NANOMSG] Error got endpoint without nanomsg url"); status = CELIX_BUNDLE_EXCEPTION; } else { - if (eScope != nullptr && eTopic != nullptr && - strncmp(eScope, scope, 1024 * 1024) == 0 && - strncmp(eTopic, topic, 1024 * 1024) == 0) { + if ((eScope == scope) && (eTopic == topic)) { receiver->disconnectFrom(url); } } @@ -609,14 +597,14 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut long serSvcId = receiver->serializerSvcId(); auto kv = serializers.map.find(serSvcId); const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType; - const char *scope = receiver->scope(); - const char *topic = receiver->topic(); + auto scope = receiver->scope(); + auto topic = receiver->topic(); std::vector<std::string> connected{}; std::vector<std::string> unconnected{}; receiver->listConnections(connected, unconnected); - fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic); + fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str()); fprintf(out, " |- serializer type = %s\n", serType); for (auto url : connected) { fprintf(out, " |- connected url = %s\n", url.c_str()); http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 3e680b6..b33a3c0 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -74,7 +74,7 @@ private: long serializerSvcId, celix_properties_t **publisherEndpoint); celix_status_t teardownTopicSender(const char *scope, const char *topic); - celix_status_t setupTopicReceiver(const char *scope, const char *topic, + celix_status_t setupTopicReceiver(const std::string &scope, const std::string &topic, long serializerSvcId, celix_properties_t **subscriberEndpoint); celix_status_t teardownTopicReceiver(const char *scope, const char *topic); @@ -117,7 +117,7 @@ private: } psa_nanomsg_serializer_entry_t; ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{}; ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{}; - ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{}; + ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; }; http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc index 2a2bcfe..3ecd19c 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc @@ -41,15 +41,15 @@ bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_he return check; } -void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter) { - for (int i = 0; i < 5; ++i) { +void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter) { + for (int i = 0; i < 5; ++i) { // 5 ?? filter[i] = '\0'; } - if (scope != NULL && strnlen(scope, 3) >= 2) { + if (scope.size() >= 2) { //3 ?? filter[0] = scope[0]; filter[1] = scope[1]; } - if (topic != NULL && strnlen(topic, 3) >= 2) { + if (topic.size() >= 2) { //3 ?? filter[2] = topic[0]; filter[3] = topic[1]; } http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h index 3d5d48d..276169f 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h @@ -20,6 +20,7 @@ #ifndef CELIX_PUBSUB_ZMQ_COMMON_H #define CELIX_PUBSUB_ZMQ_COMMON_H +#include <string> #include <utils.h> #include "version.h" @@ -48,7 +49,7 @@ typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t; int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId); -void psa_nanomsg_setScopeAndTopicFilter(const char *scope, const char *topic, char *filter); +void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter); bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr); http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 2205ed2..db8469b 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -65,8 +65,8 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, - const char *_scope, - const char *_topic, + const std::string &_scope, + const std::string &_topic, long _serializerSvcId, pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} { ctx = _ctx; @@ -76,33 +76,22 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS); if (m_nanoMsgSocket < 0) { - // TODO throw error or something - //free(receiver); - //receiver = NULL; - L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic); + L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope.c_str(), m_topic.c_str()); + std::bad_alloc{}; } else { int timeout = PSA_NANOMSG_RECV_TIMEOUT; if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof (timeout)) < 0) { - // TODO throw error or something - //free(receiver); - //receiver = NULL; - L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic); + L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope.c_str(), m_topic.c_str()); + std::bad_alloc{}; } - char subscribeFilter[5]; - psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter); + char subscriberFilter[5]; // 5 ?? + psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscriberFilter); - m_scope = strndup(m_scope, 1024 * 1024); - m_topic = strndup(m_topic, 1024 * 1024); - - //subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); - //std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n"; - //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - - int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); + int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str()); char buf[size + 1]; - snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); + snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str()); celix_service_tracking_options_t opts{}; opts.filter.ignoreServiceLanguage = true; opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; @@ -141,14 +130,13 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() { } nn_close(m_nanoMsgSocket); - free((void*)m_scope); - free((void*)m_topic); } -const char* pubsub::nanomsg::topic_receiver::scope() const { +std::string pubsub::nanomsg::topic_receiver::scope() const { return m_scope; } -const char* pubsub::nanomsg::topic_receiver::topic() const { + +std::string pubsub::nanomsg::topic_receiver::topic() const { return m_topic; } @@ -170,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> & void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { - L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url); + L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url); std::lock_guard<std::mutex> _lock(requestedConnections.mutex); auto entry = requestedConnections.map.find(url); @@ -193,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { } void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { - L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url); + L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url); std::lock_guard<std::mutex> _lock(requestedConnections.mutex); auto entry = requestedConnections.map.find(url); @@ -216,8 +204,8 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { long bndId = celix_bundle_getId(bnd); - const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) { + std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (subScope != m_scope) { //not the same scope. ignore return; } @@ -232,16 +220,10 @@ void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_prope std::forward_as_tuple(bndId), std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1)); entry = subscribers.map.find(bndId); - if (entry == subscribers.map.end()) { - std::cerr << "### THIS IS A VERY CRITICAL ERROR!!\n"; - } int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes); - if (rc == 0) { - //hashMap_put(subscribers.map, (void*)bndId, entry); - //subscribers.map[bndId] = *entry; - } else { - L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic); + if (rc != 0) { + L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str()); subscribers.map.erase(bndId); } } @@ -259,14 +241,14 @@ void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/, //remove entry int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); if (rc != 0) { - L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic); + L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope.c_str(), m_topic.c_str()); } subscribers.map.erase(bndId); } } } -void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) { +void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) { pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type))); pubsub_subscriber_t *svc = entry->svc; http://git-wip-us.apache.org/repos/asf/celix/blob/883abeed/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index 09d62a9..2519e4a 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -28,18 +28,18 @@ #include "pubsub_nanomsg_common.h" #include "pubsub/subscriber.h" -typedef struct psa_nanomsg_subscriber_entry { +struct psa_nanomsg_subscriber_entry { psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) : svc{_svc}, usageCount{_usageCount} { } pubsub_subscriber_t *svc{}; int usageCount; hash_map_t *msgTypes{nullptr}; //map from serializer svc -} psa_nanomsg_subscriber_entry_t; +}; -typedef struct psa_zmq_requested_connection_entry { +typedef struct psa_nanomsg_requested_connection_entry { public: - psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false): + psa_nanomsg_requested_connection_entry(std::string _url, int _id, bool _connected=false): url{_url}, id{_id}, connected{_connected} { } bool isConnected() const { @@ -73,23 +73,23 @@ namespace pubsub { topic_receiver(celix_bundle_context_t *ctx, log_helper_t *logHelper, - const char *scope, - const char *topic, + const std::string &scope, + const std::string &topic, long serializerSvcId, pubsub_serializer_service_t *serializer); topic_receiver(const topic_receiver &) = delete; topic_receiver & operator=(const topic_receiver &) = delete; ~topic_receiver(); - const char* scope() const; - const char* topic() const; + std::string scope() const; + std::string topic() const; long serializerSvcId() const; void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls); void connectTo(const char *url); void disconnectFrom(const char *url); void recvThread_exec(); void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize); - void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize); + void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize); void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd); @@ -98,8 +98,8 @@ namespace pubsub { log_helper_t *logHelper{nullptr}; long m_serializerSvcId{0}; pubsub_serializer_service_t *serializer{nullptr}; - const char *m_scope{nullptr}; - const char *m_topic{nullptr}; + const std::string m_scope{}; + const std::string m_topic{}; char m_scopeAndTopicFilter[5]; int m_nanoMsgSocket{0}; @@ -113,14 +113,12 @@ namespace pubsub { struct { std::mutex mutex; std::map<std::string, psa_nanomsg_requested_connection_entry_t> map; - //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t* } requestedConnections{}; long subscriberTrackerId{0}; struct { std::mutex mutex; - std::map<long, psa_nanomsg_subscriber_entry_t> map; - //hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t + std::map<long, psa_nanomsg_subscriber_entry> map; } subscribers{}; }; }