http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c deleted file mode 100644 index fd07310..0000000 --- a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c +++ /dev/null @@ -1,142 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * psa_activator.c - * - * \date Sep 30, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <stdlib.h> - -#include "bundle_activator.h" -#include "service_registration.h" -#include "service_tracker.h" - -#include "pubsub_admin_impl.h" - - -struct activator { - pubsub_admin_pt admin; - pubsub_admin_service_pt adminService; - service_registration_pt registration; - service_tracker_pt serializerTracker; -}; - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator; - - activator = calloc(1, sizeof(*activator)); - if (!activator) { - status = CELIX_ENOMEM; - } - else{ - *userData = activator; - - status = pubsubAdmin_create(context, &(activator->admin)); - - if(status == CELIX_SUCCESS){ - service_tracker_customizer_pt customizer = NULL; - status = serviceTrackerCustomizer_create(activator->admin, - NULL, - pubsubAdmin_serializerAdded, - NULL, - pubsubAdmin_serializerRemoved, - &customizer); - if(status == CELIX_SUCCESS){ - status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker)); - if(status != CELIX_SUCCESS){ - serviceTrackerCustomizer_destroy(customizer); - pubsubAdmin_destroy(activator->admin); - } - } - else{ - pubsubAdmin_destroy(activator->admin); - } - } - } - - return status; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc)); - - if (!pubsubAdminSvc) { - status = CELIX_ENOMEM; - } - else{ - pubsubAdminSvc->admin = activator->admin; - - pubsubAdminSvc->addPublication = pubsubAdmin_addPublication; - pubsubAdminSvc->removePublication = pubsubAdmin_removePublication; - - pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription; - pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription; - - pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; - pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; - - pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint; - - activator->adminService = pubsubAdminSvc; - - status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); - - status += serviceTracker_open(activator->serializerTracker); - - } - - - return status; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - status += serviceTracker_close(activator->serializerTracker); - status += serviceRegistration_unregister(activator->registration); - - activator->registration = NULL; - - free(activator->adminService); - activator->adminService = NULL; - - return status; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - serviceTracker_destroy(activator->serializerTracker); - pubsubAdmin_destroy(activator->admin); - activator->admin = NULL; - - free(activator); - - return status; -} - -
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c deleted file mode 100644 index 29ead0c..0000000 --- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ /dev/null @@ -1,1040 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_admin_impl.c - * - * \date Sep 30, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include "pubsub_admin_impl.h" -#include <zmq.h> - -#include <stdio.h> -#include <stdlib.h> - -#include <arpa/inet.h> -#include <sys/socket.h> -#include <netdb.h> - -#ifndef ANDROID -#include <ifaddrs.h> -#endif - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <string.h> - -#include "constants.h" -#include "utils.h" -#include "hash_map.h" -#include "array_list.h" -#include "bundle_context.h" -#include "bundle.h" -#include "service_reference.h" -#include "service_registration.h" -#include "log_helper.h" -#include "log_service.h" -#include "celix_threads.h" -#include "service_factory.h" - -#include "topic_subscription.h" -#include "topic_publication.h" -#include "pubsub_endpoint.h" -#include "pubsub_utils.h" -#include "subscriber.h" - -#define MAX_KEY_FOLDER_PATH_LENGTH 512 - -static const char *DEFAULT_IP = "127.0.0.1"; - -static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip); -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - -static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc); -static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); -static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); - -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { - celix_status_t status = CELIX_SUCCESS; - -#ifdef BUILD_WITH_ZMQ_SECURITY - if (!zsys_has_curve()){ - printf("PSA_ZMQ: zeromq curve unsupported\n"); - return CELIX_SERVICE_EXCEPTION; - } -#endif - - *admin = calloc(1, sizeof(**admin)); - - if (!*admin) { - status = CELIX_ENOMEM; - } - else{ - - const char *ip = NULL; - char *detectedIp = NULL; - (*admin)->bundle_context= context; - (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); - (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); - arrayList_create(&((*admin)->noSerializerSubscriptions)); - arrayList_create(&((*admin)->noSerializerPublications)); - arrayList_create(&((*admin)->serializerList)); - - celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); - celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); - celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); - celixThreadMutex_create(&(*admin)->serializerListLock, NULL); - celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL); - - celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr); - celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE); - celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr); - - celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr); - celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE); - celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr); - - if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { - logHelper_start((*admin)->loghelper); - } - - bundleContext_getProperty(context,PSA_IP , &ip); - -#ifndef ANDROID - if (ip == NULL) { - const char *interface = NULL; - - bundleContext_getProperty(context, PSA_ITF, &interface); - if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface); - } - - ip = detectedIp; - } -#endif - - if (ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip); - (*admin)->ipAddress = strdup(ip); - } - else { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP); - (*admin)->ipAddress = strdup(DEFAULT_IP); - } - - if (detectedIp != NULL) { - free(detectedIp); - } - - const char* basePortStr = NULL; - const char* maxPortStr = NULL; - char* endptrBase = NULL; - char* endptrMax = NULL; - bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); - bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); - (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); - (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); - if (*endptrBase != '\0') { - (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; - } - if (*endptrMax != '\0') { - (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; - } - - printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); - - // Disable Signal Handling by CZMQ - setenv("ZSYS_SIGHANDLER", "false", true); - - const char *nrZmqThreads = NULL; - bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads); - - if(nrZmqThreads != NULL) { - char *endPtr = NULL; - unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10); - if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) { - zsys_set_io_threads(nrThreads); - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads); - printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads); - } - } - -#ifdef BUILD_WITH_ZMQ_SECURITY - // Setup authenticator - zactor_t* auth = zactor_new (zauth, NULL); - zstr_sendx(auth, "VERBOSE", NULL); - - // Load all public keys of subscribers into the application - // This step is done for authenticating subscribers - char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH]; - char* keys_bundle_dir = pubsub_getKeysBundleDir(context); - snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir); - zstr_sendx (auth, "CURVE", curve_folder_path, NULL); - free(keys_bundle_dir); - - (*admin)->zmq_auth = auth; -#endif - - } - - return status; -} - - -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) -{ - celix_status_t status = CELIX_SUCCESS; - - free(admin->ipAddress); - - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - free((char*)hashMapEntry_getKey(entry)); - arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->pendingSubscriptions,false,false); - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - celixThreadMutex_lock(&admin->subscriptionsLock); - hashMap_destroy(admin->subscriptions,false,false); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - celixThreadMutex_lock(&admin->localPublicationsLock); - hashMap_destroy(admin->localPublications,true,false); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - celixThreadMutex_lock(&admin->externalPublicationsLock); - iter = hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - free((char*)hashMapEntry_getKey(entry)); - arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->externalPublications,false,false); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - - celixThreadMutex_lock(&admin->serializerListLock); - arrayList_destroy(admin->serializerList); - celixThreadMutex_unlock(&admin->serializerListLock); - - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_destroy(admin->noSerializerSubscriptions); - arrayList_destroy(admin->noSerializerPublications); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - - - celixThreadMutex_lock(&admin->usedSerializersLock); - - iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer); - while(hashMapIterator_hasNext(iter)){ - arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false); - - iter = hashMapIterator_create(admin->topicPublicationsPerSerializer); - while(hashMapIterator_hasNext(iter)){ - arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->topicPublicationsPerSerializer,false,false); - - celixThreadMutex_unlock(&admin->usedSerializersLock); - - celixThreadMutex_destroy(&admin->usedSerializersLock); - celixThreadMutex_destroy(&admin->serializerListLock); - celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); - - celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr); - celixThreadMutex_destroy(&admin->noSerializerPendingsLock); - - celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr); - celixThreadMutex_destroy(&admin->subscriptionsLock); - - celixThreadMutex_destroy(&admin->localPublicationsLock); - celixThreadMutex_destroy(&admin->externalPublicationsLock); - - logHelper_stop(admin->loghelper); - - logHelper_destroy(&admin->loghelper); - -#ifdef BUILD_WITH_ZMQ_SECURITY - if (admin->zmq_auth != NULL){ - zactor_destroy(&(admin->zmq_auth)); - } -#endif - - free(admin); - - return status; -} - -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - - if(any_sub==NULL){ - - int i; - pubsub_serializer_service_t *best_serializer = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ - status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub); - } - else{ - printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - if (status == CELIX_SUCCESS){ - - /* Connect all internal publishers */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - } - arrayList_destroy(topic_publishers); - } - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Connect also all external publishers */ - celixThreadMutex_lock(&admin->externalPublicationsLock); - hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(extp_iter)){ - array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); - if(ext_pub_list!=NULL){ - for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - } - } - } - hashMapIterator_destroy(extp_iter); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - - - pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); - - status += pubsub_topicSubscriptionStart(any_sub); - - } - - if (status == CELIX_SUCCESS){ - hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); - connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false); - } - - } - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); - - if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ - return pubsubAdmin_addAnySubscription(admin,subEP); - } - - /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - celixThreadMutex_lock(&admin->subscriptionsLock); - celixThreadMutex_lock(&admin->localPublicationsLock); - celixThreadMutex_lock(&admin->externalPublicationsLock); - - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); - - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - - if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic - pubsubAdmin_addSubscriptionToPendingList(admin,subEP); - } - else{ - int i; - topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); - - if(subscription == NULL) { - pubsub_serializer_service_t *best_serializer = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ - status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic, best_serializer, &subscription); - } - else{ - printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - if (status==CELIX_SUCCESS){ - - /* Try to connect internal publishers */ - if(factory!=NULL){ - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); - } - } - arrayList_destroy(topic_publishers); - } - - } - - /* Look also for external publishers */ - if(ext_pub_list!=NULL){ - for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); - } - } - } - - pubsub_topicSubscriptionAddSubscriber(subscription,subEP); - - status += pubsub_topicSubscriptionStart(subscription); - - } - - if(status==CELIX_SUCCESS){ - - hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); - - connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); - } - } - - if (status == CELIX_SUCCESS){ - pubsub_topicIncreaseNrSubscribers(subscription); - } - } - - free(scope_topic); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - celixThreadMutex_unlock(&admin->localPublicationsLock); - celixThreadMutex_unlock(&admin->subscriptionsLock); - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic); - - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); - - celixThreadMutex_lock(&admin->subscriptionsLock); - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL){ - pubsub_topicDecreaseNrSubscribers(sub); - if(pubsub_topicGetNrSubscribers(sub) == 0) { - status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP); - } - } - celixThreadMutex_unlock(&admin->subscriptionsLock); - - if(sub==NULL){ - /* Maybe the endpoint was pending */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){ - status = CELIX_ILLEGAL_STATE; - } - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - free(scope_topic); - - - - return status; - -} - -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic); - - const char* fwUUID = NULL; - - bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if (fwUUID == NULL) { - printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - - if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { - - celixThreadMutex_lock(&admin->localPublicationsLock); - - service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); - - if (factory == NULL) { - topic_publication_pt pub = NULL; - pubsub_serializer_service_t *best_serializer = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){ - status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub); - } - else{ - printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerPublications,pubEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - if (status == CELIX_SUCCESS) { - status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); - if (status == CELIX_SUCCESS && factory != NULL) { - hashMap_put(admin->localPublications, strdup(scope_topic), factory); - connectTopicPubSubToSerializer(admin, best_serializer, pub, true); - } - } else { - printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); - } - } else { - //just add the new EP to the list - topic_publication_pt pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationAddPublisherEP(pub, pubEP); - } - - celixThreadMutex_unlock(&admin->localPublicationsLock); - } - else{ - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); - if (ext_pub_list == NULL) { - arrayList_create(&ext_pub_list); - hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); - } - - arrayList_add(ext_pub_list, pubEP); - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Re-evaluate the pending subscriptions */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - - hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); - if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. - char* topic = (char*) hashMapEntry_getKey(pendingSub); - array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); - int i; - for (i = 0; i < arrayList_size(pendingSubList); i++) { - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); - pubsubAdmin_addSubscription(admin, subEP); - } - hashMap_remove(admin->pendingSubscriptions, scope_topic); - arrayList_clear(pendingSubList); - arrayList_destroy(pendingSubList); - free(topic); - } - - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - /* Connect the new publisher to the subscription for his topic, if there is any */ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); - if (sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); - if (any_sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); - } - - free(scope_topic); - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ - celix_status_t status = CELIX_SUCCESS; - int count = 0; - - printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic); - - const char* fwUUID = NULL; - - bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - - if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ - - celixThreadMutex_lock(&admin->localPublicationsLock); - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - if(factory!=NULL){ - topic_publication_pt pub = (topic_publication_pt)factory->handle; - pubsub_topicPublicationRemovePublisherEP(pub,pubEP); - } - celixThreadMutex_unlock(&admin->localPublicationsLock); - - if(factory==NULL){ - /* Maybe the endpoint was pending */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){ - status = CELIX_ILLEGAL_STATE; - } - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - } - else{ - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - if(ext_pub_list!=NULL){ - int i; - bool found = false; - for(i=0;!found && i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - found = pubsubEndpoint_equals(pubEP,p); - if (found){ - arrayList_remove(ext_pub_list,i); - } - } - // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) - for(i=0; i<arrayList_size(ext_pub_list);i++) { - pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if (strcmp(pubEP->endpoint,p->endpoint) == 0) { - count++; - } - } - - if(arrayList_size(ext_pub_list)==0){ - hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); - char* topic = (char*)hashMapEntry_getKey(entry); - array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); - hashMap_remove(admin->externalPublications,topic); - arrayList_destroy(list); - free(topic); - } - } - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Check if this publisher was connected to one of our subscribers*/ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint); - } - - free(scope_topic); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_ZMQ: Closing all publications\n"); - - celixThreadMutex_lock(&admin->localPublicationsLock); - char *scope_topic = createScopeTopicKey(scope, topic); - hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); - if(pubsvc_entry!=NULL){ - char* key = (char*)hashMapEntry_getKey(pubsvc_entry); - service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); - topic_publication_pt pub = (topic_publication_pt)factory->handle; - status += pubsub_topicPublicationStop(pub); - disconnectTopicPubSubFromSerializer(admin, pub, true); - status += pubsub_topicPublicationDestroy(pub); - hashMap_remove(admin->localPublications,scope_topic); - free(key); - free(factory); - } - free(scope_topic); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - return status; - -} - -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_ZMQ: Closing all subscriptions\n"); - - celixThreadMutex_lock(&admin->subscriptionsLock); - char *scope_topic = createScopeTopicKey(scope, topic); - hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic); - if(sub_entry!=NULL){ - char* topic = (char*)hashMapEntry_getKey(sub_entry); - - topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry); - status += pubsub_topicSubscriptionStop(ts); - disconnectTopicPubSubFromSerializer(admin, ts, false); - status += pubsub_topicSubscriptionDestroy(ts); - hashMap_remove(admin->subscriptions,scope_topic); - free(topic); - - } - free(scope_topic); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - - -#ifndef ANDROID -static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) { - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - - struct ifaddrs *ifaddr, *ifa; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) != -1) - { - for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == NULL) - continue; - - if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { - if (interface == NULL) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - else if (strcmp(ifa->ifa_name, interface) == 0) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - } - } - - freeifaddrs(ifaddr); - } - - return status; -} -#endif - -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); - array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); - if(pendingListPerTopic==NULL){ - arrayList_create(&pendingListPerTopic); - hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); - } - arrayList_add(pendingListPerTopic,subEP); - free(scope_topic); - return status; -} - -celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){ - /* Assumption: serializers are all available at startup. - * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */ - - celix_status_t status = CELIX_SUCCESS; - int i=0; - - const char *serType = NULL; - serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); - if(serType == NULL){ - printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); - return CELIX_SERVICE_EXCEPTION; - } - - pubsub_admin_pt admin = (pubsub_admin_pt)handle; - celixThreadMutex_lock(&admin->serializerListLock); - arrayList_add(admin->serializerList, reference); - celixThreadMutex_unlock(&admin->serializerListLock); - - /* Now let's re-evaluate the pending */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - - for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){ - pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); - pubsub_serializer_service_t *best_serializer = NULL; - pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); - if(best_serializer != NULL){ /* Finally we have a valid serializer! */ - pubsubAdmin_addSubscription(admin, ep); - } - } - - for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){ - pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); - pubsub_serializer_service_t *best_serializer = NULL; - pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); - if(best_serializer != NULL){ /* Finally we have a valid serializer! */ - pubsubAdmin_addPublication(admin, ep); - } - } - - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - - printf("PSA_ZMQ: %s serializer added\n",serType); - - return status; -} - -celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ - - pubsub_admin_pt admin = (pubsub_admin_pt)handle; - int i=0, j=0; - const char *serType = NULL; - - serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); - if(serType == NULL){ - printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); - return CELIX_SERVICE_EXCEPTION; - } - - celixThreadMutex_lock(&admin->serializerListLock); - /* Remove the serializer from the list */ - arrayList_removeElement(admin->serializerList, reference); - celixThreadMutex_unlock(&admin->serializerListLock); - - - celixThreadMutex_lock(&admin->usedSerializersLock); - array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service); - array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service); - celixThreadMutex_unlock(&admin->usedSerializersLock); - - /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */ - if(topicPubList!=NULL){ - for(i=0;i<arrayList_size(topicPubList);i++){ - topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i); - /* Stop the topic publication */ - pubsub_topicPublicationStop(topicPub); - /* Get the endpoints that are going to be orphan */ - array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub); - for(j=0;j<arrayList_size(pubList);j++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j); - /* Remove the publication */ - pubsubAdmin_removePublication(admin, pubEP); - /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(pubEP->endpoint!=NULL){ - free(pubEP->endpoint); - pubEP->endpoint = NULL; - } - /* Add the orphan endpoint to the noSerializer pending list */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerPublications,pubEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - arrayList_destroy(pubList); - - /* Cleanup also the localPublications hashmap*/ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications); - char *key = NULL; - service_factory_pt factory = NULL; - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - factory = (service_factory_pt)hashMapEntry_getValue(entry); - topic_publication_pt pub = (topic_publication_pt)factory->handle; - if(pub==topicPub){ - key = (char*)hashMapEntry_getKey(entry); - break; - } - } - hashMapIterator_destroy(iter); - if(key!=NULL){ - hashMap_remove(admin->localPublications, key); - free(factory); - free(key); - } - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Finally destroy the topicPublication */ - pubsub_topicPublicationDestroy(topicPub); - } - arrayList_destroy(topicPubList); - } - - /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */ - if(topicSubList!=NULL){ - for(i=0;i<arrayList_size(topicSubList);i++){ - topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i); - /* Stop the topic subscription */ - pubsub_topicSubscriptionStop(topicSub); - /* Get the endpoints that are going to be orphan */ - array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub); - for(j=0;j<arrayList_size(subList);j++){ - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j); - /* Remove the subscription */ - pubsubAdmin_removeSubscription(admin, subEP); - /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(subEP->endpoint!=NULL){ - free(subEP->endpoint); - subEP->endpoint = NULL; - } - /* Add the orphan endpoint to the noSerializer pending list */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - /* Cleanup also the subscriptions hashmap*/ - celixThreadMutex_lock(&admin->subscriptionsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions); - char *key = NULL; - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry); - if(sub==topicSub){ - key = (char*)hashMapEntry_getKey(entry); - break; - } - } - hashMapIterator_destroy(iter); - if(key!=NULL){ - hashMap_remove(admin->subscriptions, key); - free(key); - } - celixThreadMutex_unlock(&admin->subscriptionsLock); - - /* Finally destroy the topicSubscription */ - pubsub_topicSubscriptionDestroy(topicSub); - } - arrayList_destroy(topicSubList); - } - - - - printf("PSA_ZMQ: %s serializer removed\n",serType); - - - return CELIX_SUCCESS; -} - -celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->serializerListLock); - status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score); - celixThreadMutex_unlock(&admin->serializerListLock); - - return status; -} - -/* This one recall the same logic as in the match function */ -static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){ - - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->serializerListLock); - status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc); - celixThreadMutex_unlock(&admin->serializerListLock); - - return status; - -} - -static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ - - celixThreadMutex_lock(&admin->usedSerializersLock); - - hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; - array_list_pt list = (array_list_pt)hashMap_get(map,serializer); - if(list==NULL){ - arrayList_create(&list); - hashMap_put(map,serializer,list); - } - arrayList_add(list,topicPubSub); - - celixThreadMutex_unlock(&admin->usedSerializersLock); - -} - -static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ - - celixThreadMutex_lock(&admin->usedSerializersLock); - - hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; - hash_map_iterator_pt iter = hashMapIterator_create(map); - while(hashMapIterator_hasNext(iter)){ - array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter); - if(arrayList_removeElement(list, topicPubSub)){ //Found it! - break; - } - } - hashMapIterator_destroy(iter); - - celixThreadMutex_unlock(&admin->usedSerializersLock); - -} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c deleted file mode 100644 index e405866..0000000 --- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ /dev/null @@ -1,630 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#include <czmq.h> -/* The following undefs prevent the collision between: - * - sys/syslog.h (which is included within czmq) - * - celix/dfi/dfi_log_util.h - */ -#undef LOG_DEBUG -#undef LOG_WARNING -#undef LOG_INFO -#undef LOG_WARNING - -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -#include "array_list.h" -#include "celixbool.h" -#include "service_registration.h" -#include "utils.h" -#include "service_factory.h" -#include "version.h" - -#include "pubsub_common.h" -#include "pubsub_utils.h" -#include "publisher.h" - -#include "topic_publication.h" - -#include "pubsub_serializer.h" - -#ifdef BUILD_WITH_ZMQ_SECURITY - #include "zmq_crypto.h" - - #define MAX_CERT_PATH_LENGTH 512 -#endif - -#define EP_ADDRESS_LEN 32 -#define ZMQ_BIND_MAX_RETRY 5 - -#define FIRST_SEND_DELAY 2 - -struct topic_publication { - zsock_t* zmq_socket; - celix_thread_mutex_t socket_lock; //Protects zmq_socket access - zcert_t * zmq_cert; - char* endpoint; - service_registration_pt svcFactoryReg; - array_list_pt pub_ep_list; //List<pubsub_endpoint> - hash_map_pt boundServices; //<bundle_pt,bound_service> - pubsub_serializer_service_t *serializer; - celix_thread_mutex_t tp_lock; -}; - -typedef struct publish_bundle_bound_service { - topic_publication_pt parent; - pubsub_publisher_t service; - bundle_pt bundle; - char *topic; - hash_map_pt msgTypes; - unsigned short getCount; - celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure - bool mp_send_in_progress; - array_list_pt mp_parts; -}* publish_bundle_bound_service_pt; - -/* Note: correct locking order is - * 1. tp_lock - * 2. mp_lock - * 3. socket_lock - * - * tp_lock and socket_lock are independent. - */ - -typedef struct pubsub_msg{ - pubsub_msg_header_pt header; - char* payload; - int payloadSize; -}* pubsub_msg_pt; - -static unsigned int rand_range(unsigned int min, unsigned int max); - -static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); - -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); - -static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); -static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags); -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); - -static void delay_first_send_for_late_joiners(void); - -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ - celix_status_t status = CELIX_SUCCESS; - -#ifdef BUILD_WITH_ZMQ_SECURITY - char* secure_topics = NULL; - bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics); - - if (secure_topics){ - array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics); - - int i; - int secure_topics_size = arrayList_size(secure_topics_list); - for (i = 0; i < secure_topics_size; i++){ - char* top = arrayList_get(secure_topics_list, i); - if (strcmp(pubEP->topic, top) == 0){ - printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top); - pubEP->is_secure = true; - } - free(top); - top = NULL; - } - - arrayList_destroy(secure_topics_list); - } - - zcert_t* pub_cert = NULL; - if (pubEP->is_secure){ - char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); - if (keys_bundle_dir == NULL){ - return CELIX_SERVICE_EXCEPTION; - } - - const char* keys_file_path = NULL; - const char* keys_file_name = NULL; - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); - - char cert_path[MAX_CERT_PATH_LENGTH]; - - //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key" - snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic); - free(keys_bundle_dir); - printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path); - - pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path); - if (pub_cert == NULL){ - printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path); - printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic); - pubEP->is_secure = false; - } - } -#endif - - zsock_t* socket = zsock_new (ZMQ_PUB); - if(socket==NULL){ - #ifdef BUILD_WITH_ZMQ_SECURITY - if (pubEP->is_secure){ - zcert_destroy(&pub_cert); - } - #endif - - perror("Error for zmq_socket"); - return CELIX_SERVICE_EXCEPTION; - } -#ifdef BUILD_WITH_ZMQ_SECURITY - if (pubEP->is_secure){ - zcert_apply (pub_cert, socket); // apply certificate to socket - zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions - } -#endif - - int rv = -1, retry=0; - char* ep = malloc(EP_ADDRESS_LEN); - char bindAddress[EP_ADDRESS_LEN]; - - while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){ - /* Randomized part due to same bundle publishing on different topics */ - unsigned int port = rand_range(basePort,maxPort); - memset(ep,0,EP_ADDRESS_LEN); - memset(bindAddress, 0, EP_ADDRESS_LEN); - - snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port); - snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address - rv = zsock_bind (socket, "%s", bindAddress); - if (rv == -1) { - perror("Error for zmq_bind"); - } - retry++; - } - - if(rv == -1){ - free(ep); - return CELIX_SERVICE_EXCEPTION; - } - - /* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */ - - topic_publication_pt pub = calloc(1,sizeof(*pub)); - - arrayList_create(&(pub->pub_ep_list)); - pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL); - celixThreadMutex_create(&(pub->tp_lock),NULL); - - pub->endpoint = ep; - pub->zmq_socket = socket; - pub->serializer = best_serializer; - - celixThreadMutex_create(&(pub->socket_lock),NULL); - -#ifdef BUILD_WITH_ZMQ_SECURITY - if (pubEP->is_secure){ - pub->zmq_cert = pub_cert; - } -#endif - - pubsub_topicPublicationAddPublisherEP(pub,pubEP); - - *out = pub; - - return status; -} - -celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&(pub->tp_lock)); - - free(pub->endpoint); - arrayList_destroy(pub->pub_ep_list); - - hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); - pubsub_destroyPublishBundleBoundService(bound); - } - hashMapIterator_destroy(iter); - hashMap_destroy(pub->boundServices,false,false); - - pub->svcFactoryReg = NULL; - pub->serializer = NULL; -#ifdef BUILD_WITH_ZMQ_SECURITY - zcert_destroy(&(pub->zmq_cert)); -#endif - - celixThreadMutex_unlock(&(pub->tp_lock)); - - celixThreadMutex_destroy(&(pub->tp_lock)); - - celixThreadMutex_lock(&(pub->socket_lock)); - zsock_destroy(&(pub->zmq_socket)); - celixThreadMutex_unlock(&(pub->socket_lock)); - - celixThreadMutex_destroy(&(pub->socket_lock)); - - free(pub); - - return status; -} - -celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){ - celix_status_t status = CELIX_SUCCESS; - - /* Let's register the new service */ - - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); - - if(pubEP!=NULL){ - service_factory_pt factory = calloc(1, sizeof(*factory)); - factory->handle = pub; - factory->getService = pubsub_topicPublicationGetService; - factory->ungetService = pubsub_topicPublicationUngetService; - - properties_pt props = properties_create(); - properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); - properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); - properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION); - - status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); - - if(status != CELIX_SUCCESS){ - properties_destroy(props); - printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); - } - else{ - *svcFactory = factory; - } - } - else{ - printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); - status = CELIX_SERVICE_EXCEPTION; - } - - return status; -} - -celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ - return serviceRegistration_unregister(pub->svcFactoryReg); -} - -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ - - celixThreadMutex_lock(&(pub->tp_lock)); - ep->endpoint = strdup(pub->endpoint); - arrayList_add(pub->pub_ep_list,ep); - celixThreadMutex_unlock(&(pub->tp_lock)); - - return CELIX_SUCCESS; -} - -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ - - celixThreadMutex_lock(&(pub->tp_lock)); - for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) { - pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i); - if(pubsubEndpoint_equals(ep, e)) { - arrayList_removeElement(pub->pub_ep_list,ep); - break; - } - } - celixThreadMutex_unlock(&(pub->tp_lock)); - - return CELIX_SUCCESS; -} - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ - array_list_pt list = NULL; - celixThreadMutex_lock(&(pub->tp_lock)); - list = arrayList_clone(pub->pub_ep_list); - celixThreadMutex_unlock(&(pub->tp_lock)); - return list; -} - - -static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { - celix_status_t status = CELIX_SUCCESS; - - topic_publication_pt publish = (topic_publication_pt)handle; - - celixThreadMutex_lock(&(publish->tp_lock)); - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound==NULL){ - bound = pubsub_createPublishBundleBoundService(publish,bundle); - if(bound!=NULL){ - hashMap_put(publish->boundServices,bundle,bound); - } - } - else{ - bound->getCount++; - } - - *service = &bound->service; - - celixThreadMutex_unlock(&(publish->tp_lock)); - - return status; -} - -static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { - - topic_publication_pt publish = (topic_publication_pt)handle; - - celixThreadMutex_lock(&(publish->tp_lock)); - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound!=NULL){ - - bound->getCount--; - if(bound->getCount==0){ - pubsub_destroyPublishBundleBoundService(bound); - hashMap_remove(publish->boundServices,bundle); - } - - } - else{ - long bundleId = -1; - bundle_getBundleId(bundle,&bundleId); - printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId); - } - - /* service should be never used for unget, so let's set the pointer to NULL */ - *service = NULL; - - celixThreadMutex_unlock(&(publish->tp_lock)); - - return CELIX_SUCCESS; -} - -static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){ - - bool ret = true; - - zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header)); - if (headerMsg == NULL) ret=false; - zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize); - if (payloadMsg == NULL) ret=false; - - delay_first_send_for_late_joiners(); - - if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; - - if(!last){ - if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; - } - else{ - if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false; - } - - if (!ret){ - zframe_destroy(&headerMsg); - zframe_destroy(&payloadMsg); - } - - free(msg->header); - free(msg->payload); - free(msg); - - return ret; - -} - -static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ - - bool ret = true; - - unsigned int i = 0; - unsigned int mp_num = arrayList_size(mp_msg_parts); - for(;i<mp_num;i++){ - ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1)); - } - arrayList_clear(mp_msg_parts); - - return ret; - -} - -static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) { - - return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG); - -} - -static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){ - - int status = 0; - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; - - celixThreadMutex_lock(&(bound->parent->tp_lock)); - celixThreadMutex_lock(&(bound->mp_lock)); - if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg - printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n"); - celixThreadMutex_unlock(&(bound->mp_lock)); - celixThreadMutex_unlock(&(bound->parent->tp_lock)); - return -3; - } - - pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId); - - if (msgSer!= NULL) { - int major=0, minor=0; - - pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); - strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); - msg_hdr->type = msgTypeId; - - if (msgSer->msgVersion != NULL){ - version_getMajor(msgSer->msgVersion, &major); - version_getMinor(msgSer->msgVersion, &minor); - msg_hdr->major = major; - msg_hdr->minor = minor; - } - - void *serializedOutput = NULL; - size_t serializedOutputLen = 0; - msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); - - pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); - msg->header = msg_hdr; - msg->payload = (char*)serializedOutput; - msg->payloadSize = serializedOutputLen; - bool snd = true; - - switch(flags){ - case PUBSUB_PUBLISHER_FIRST_MSG: - bound->mp_send_in_progress = true; - arrayList_add(bound->mp_parts,msg); - break; - case PUBSUB_PUBLISHER_PART_MSG: - if(!bound->mp_send_in_progress){ - printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n"); - status = -4; - } - else{ - arrayList_add(bound->mp_parts,msg); - } - break; - case PUBSUB_PUBLISHER_LAST_MSG: - if(!bound->mp_send_in_progress){ - printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n"); - status = -4; - } - else{ - arrayList_add(bound->mp_parts,msg); - snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts); - bound->mp_send_in_progress = false; - } - break; - case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case - snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true); - break; - default: - printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n"); - status = -4; - break; - } - - if(status==-4){ - free(msg); - } - - if(!snd){ - printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId); - } - - } else { - printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId); - status=-1; - } - - celixThreadMutex_unlock(&(bound->mp_lock)); - celixThreadMutex_unlock(&(bound->parent->tp_lock)); - - return status; - -} - -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; -} - - -static unsigned int rand_range(unsigned int min, unsigned int max){ - - double scaled = (double)(((double)random())/((double)RAND_MAX)); - return (max-min+1)*scaled + min; - -} - -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ - - //PRECOND lock on tp->lock - - publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); - - if (bound != NULL) { - - bound->parent = tp; - bound->bundle = bundle; - bound->getCount = 1; - bound->mp_send_in_progress = false; - celixThreadMutex_create(&bound->mp_lock,NULL); - - if(tp->serializer != NULL){ - tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes); - } - - arrayList_create(&bound->mp_parts); - - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); - bound->topic=strdup(pubEP->topic); - - bound->service.handle = bound; - bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->service.send = pubsub_topicPublicationSend; - bound->service.sendMultipart = pubsub_topicPublicationSendMultipart; - - } - - return bound; -} - -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ - - //PRECOND lock on tp->lock - - celixThreadMutex_lock(&boundSvc->mp_lock); - - - if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){ - boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes); - } - - if(boundSvc->mp_parts!=NULL){ - arrayList_destroy(boundSvc->mp_parts); - } - - if(boundSvc->topic!=NULL){ - free(boundSvc->topic); - } - - celixThreadMutex_unlock(&boundSvc->mp_lock); - celixThreadMutex_destroy(&boundSvc->mp_lock); - - free(boundSvc); - -} - -static void delay_first_send_for_late_joiners(){ - - static bool firstSend = true; - - if(firstSend){ - printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n"); - sleep(FIRST_SEND_DELAY); - firstSend = false; - } -} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c deleted file mode 100644 index 0e7a794..0000000 --- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ /dev/null @@ -1,732 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * topic_subscription.c - * - * \date Oct 2, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include "topic_subscription.h" -#include <czmq.h> -/* The following undefs prevent the collision between: - * - sys/syslog.h (which is included within czmq) - * - celix/dfi/dfi_log_util.h - */ -#undef LOG_DEBUG -#undef LOG_WARNING -#undef LOG_INFO -#undef LOG_WARNING - -#include <string.h> -#include <stdlib.h> -#include <signal.h> - -#include "utils.h" -#include "celix_errno.h" -#include "constants.h" -#include "version.h" - -#include "subscriber.h" -#include "publisher.h" -#include "pubsub_utils.h" - -#ifdef BUILD_WITH_ZMQ_SECURITY -#include "zmq_crypto.h" - -#define MAX_CERT_PATH_LENGTH 512 -#endif - -#define POLL_TIMEOUT 250 -#define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" - -struct topic_subscription{ - - zsock_t* zmq_socket; - zcert_t * zmq_cert; - zcert_t * zmq_pub_cert; - pthread_mutex_t socket_lock; - service_tracker_pt tracker; - array_list_pt sub_ep_list; - celix_thread_t recv_thread; - bool running; - celix_thread_mutex_t ts_lock; - bundle_context_pt context; - - pubsub_serializer_service_t *serializer; - - hash_map_pt servicesMap; // key = service, value = msg types map - - celix_thread_mutex_t pendingConnections_lock; - array_list_pt pendingConnections; - - array_list_pt pendingDisconnections; - celix_thread_mutex_t pendingDisconnections_lock; - - unsigned int nrSubscribers; -}; - -typedef struct complete_zmq_msg{ - zframe_t* header; - zframe_t* payload; -}* complete_zmq_msg_pt; - -typedef struct mp_handle{ - hash_map_pt svc_msg_db; - hash_map_pt rcv_msg_map; -}* mp_handle_pt; - -typedef struct msg_map_entry{ - bool retain; - void* msgInst; -}* msg_map_entry_pt; - -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); -static void* zmq_recv_thread_func(void* arg); -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); -static void sigusr1_sighandler(int signo); -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); -static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); -static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list); -static void destroy_mp_handle(mp_handle_pt mp_handle); -static void connectPendingPublishers(topic_subscription_pt sub); -static void disconnectPendingPublishers(topic_subscription_pt sub); - -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ - celix_status_t status = CELIX_SUCCESS; - -#ifdef BUILD_WITH_ZMQ_SECURITY - char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); - if (keys_bundle_dir == NULL){ - return CELIX_SERVICE_EXCEPTION; - } - - const char* keys_file_path = NULL; - const char* keys_file_name = NULL; - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); - - char sub_cert_path[MAX_CERT_PATH_LENGTH]; - char pub_cert_path[MAX_CERT_PATH_LENGTH]; - - //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" - snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); - snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); - free(keys_bundle_dir); - - printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", sub_cert_path); - printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path); - - zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); - if (sub_cert == NULL){ - printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path); - return CELIX_SERVICE_EXCEPTION; - } - - zcert_t* pub_cert = zcert_load(pub_cert_path); - if (pub_cert == NULL){ - zcert_destroy(&sub_cert); - printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path); - return CELIX_SERVICE_EXCEPTION; - } - - const char* pub_key = zcert_public_txt(pub_cert); -#endif - - zsock_t* zmq_s = zsock_new (ZMQ_SUB); - if(zmq_s==NULL){ -#ifdef BUILD_WITH_ZMQ_SECURITY - zcert_destroy(&sub_cert); - zcert_destroy(&pub_cert); -#endif - - return CELIX_SERVICE_EXCEPTION; - } - -#ifdef BUILD_WITH_ZMQ_SECURITY - zcert_apply (sub_cert, zmq_s); - zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber -#endif - - if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){ - zsock_set_subscribe (zmq_s, ""); - } - else{ - zsock_set_subscribe (zmq_s, topic); - } - - topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); - ts->context = bundle_context; - ts->zmq_socket = zmq_s; - ts->running = false; - ts->nrSubscribers = 0; - ts->serializer = best_serializer; - -#ifdef BUILD_WITH_ZMQ_SECURITY - ts->zmq_cert = sub_cert; - ts->zmq_pub_cert = pub_cert; -#endif - - celixThreadMutex_create(&ts->socket_lock, NULL); - celixThreadMutex_create(&ts->ts_lock,NULL); - arrayList_create(&ts->sub_ep_list); - ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); - - arrayList_create(&ts->pendingConnections); - arrayList_create(&ts->pendingDisconnections); - celixThreadMutex_create(&ts->pendingConnections_lock, NULL); - celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); - - char filter[128]; - memset(filter,0,128); - if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { - // default scope, means that subscriber has not defined a scope property - snprintf(filter, 128, "(&(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic); - - } else { - snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic, - PUBSUB_SUBSCRIBER_SCOPE,scope); - } - service_tracker_customizer_pt customizer = NULL; - status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); - status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); - - struct sigaction actions; - memset(&actions, 0, sizeof(actions)); - sigemptyset(&actions.sa_mask); - actions.sa_flags = 0; - actions.sa_handler = sigusr1_sighandler; - - sigaction(SIGUSR1,&actions,NULL); - - *out=ts; - - return status; -} - -celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - - ts->running = false; - serviceTracker_destroy(ts->tracker); - arrayList_clear(ts->sub_ep_list); - arrayList_destroy(ts->sub_ep_list); - /* TODO: Destroy all the serializer maps? */ - hashMap_destroy(ts->servicesMap,false,false); - - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_destroy(ts->pendingConnections); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - celixThreadMutex_destroy(&ts->pendingConnections_lock); - - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_destroy(ts->pendingDisconnections); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - celixThreadMutex_destroy(&ts->pendingDisconnections_lock); - - celixThreadMutex_unlock(&ts->ts_lock); - - celixThreadMutex_lock(&ts->socket_lock); - zsock_destroy(&(ts->zmq_socket)); -#ifdef BUILD_WITH_ZMQ_SECURITY - zcert_destroy(&(ts->zmq_cert)); - zcert_destroy(&(ts->zmq_pub_cert)); -#endif - celixThreadMutex_unlock(&ts->socket_lock); - celixThreadMutex_destroy(&ts->socket_lock); - - celixThreadMutex_destroy(&ts->ts_lock); - - free(ts); - - return status; -} - -celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - status = serviceTracker_open(ts->tracker); - - ts->running = true; - - if(status==CELIX_SUCCESS){ - status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts); - } - - return status; -} - -celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - ts->running = false; - - pthread_kill(ts->recv_thread.thread,SIGUSR1); - - celixThread_join(ts->recv_thread,NULL); - - status = serviceTracker_close(ts->tracker); - - return status; -} - -celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){ - celix_status_t status = CELIX_SUCCESS; - celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){ - status = CELIX_SERVICE_EXCEPTION; - } - celixThreadMutex_unlock(&ts->socket_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_add(ts->pendingConnections, url); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - return status; -} - -celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_add(ts->pendingDisconnections, url); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - return status; -} - -celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){ - status = CELIX_SERVICE_EXCEPTION; - } - celixThreadMutex_unlock(&ts->socket_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - arrayList_add(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); - - return status; - -} - -celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers++; - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - arrayList_removeElement(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers--; - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { - return ts->nrSubscribers; -} - -array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){ - return sub->sub_ep_list; -} - -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->servicesMap, service)) { - bundle_pt bundle = NULL; - hash_map_pt msgTypes = NULL; - - serviceReference_getBundle(reference, &bundle); - - if(ts->serializer != NULL && bundle!=NULL){ - ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes); - if(msgTypes != NULL){ - hashMap_put(ts->servicesMap, service, msgTypes); - printf("PSA_ZMQ_TS: New subscriber registered.\n"); - } - } - else{ - printf("PSA_ZMQ_TS: Cannot register new subscriber.\n"); - status = CELIX_SERVICE_EXCEPTION; - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); - if(msgTypes!=NULL && ts->serializer!=NULL){ - ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes); - printf("PSA_ZMQ_TS: Subscriber unregistered.\n"); - } - else{ - printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n"); - status = CELIX_SERVICE_EXCEPTION; - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - - -static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ - - pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); - - hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - hash_map_pt msgTypes = hashMapEntry_getValue(entry); - - pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type); - if (msgSer == NULL) { - printf("PSA_ZMQ_TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type); - } - else{ - void *msgInst = NULL; - bool validVersion = checkVersion(msgSer->msgVersion,first_msg_hdr); - - if(validVersion){ - - celix_status_t status = msgSer->deserialize(msgSer, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst); - - if (status == CELIX_SUCCESS) { - bool release = true; - mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list); - pubsub_multipart_callbacks_t mp_callbacks; - mp_callbacks.handle = mp_handle; - mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; - mp_callbacks.getMultipart = pubsub_getMultipart; - subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release); - - if(release){ - msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst); - } - if(mp_handle!=NULL){ - destroy_mp_handle(mp_handle); - } - } - else{ - printf("PSA_ZMQ_TS: Cannot deserialize msgType %s.\n",msgSer->msgName); - } - - } - else{ - int major=0,minor=0; - version_getMajor(msgSer->msgVersion,&major); - version_getMinor(msgSer->msgVersion,&minor); - printf("PSA_ZMQ_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", - msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor); - } - - } - } - hashMapIterator_destroy(iter); - - int i = 0; - for(;i<arrayList_size(msg_list);i++){ - complete_zmq_msg_pt c_msg = arrayList_get(msg_list,i); - zframe_destroy(&(c_msg->header)); - zframe_destroy(&(c_msg->payload)); - free(c_msg); - } - - arrayList_destroy(msg_list); - -} - -static void* zmq_recv_thread_func(void * arg) { - topic_subscription_pt sub = (topic_subscription_pt) arg; - - while (sub->running) { - - celixThreadMutex_lock(&sub->socket_lock); - - zframe_t* headerMsg = zframe_recv(sub->zmq_socket); - if (headerMsg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("PSA_ZMQ_TS: header_recv thread for topic got a signal and will exit.\n"); - } else { - perror("PSA_ZMQ_TS: header_recv thread"); - } - } - else { - - pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg); - - if (zframe_more(headerMsg)) { - - zframe_t* payloadMsg = zframe_recv(sub->zmq_socket); - if (payloadMsg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("PSA_ZMQ_TS: payload_recv thread for topic got a signal and will exit.\n"); - } else { - perror("PSA_ZMQ_TS: payload_recv"); - } - zframe_destroy(&headerMsg); - } else { - - //Let's fetch all the messages from the socket - array_list_pt msg_list = NULL; - arrayList_create(&msg_list); - complete_zmq_msg_pt firstMsg = calloc(1, sizeof(struct complete_zmq_msg)); - firstMsg->header = headerMsg; - firstMsg->payload = payloadMsg; - arrayList_add(msg_list, firstMsg); - - bool more = zframe_more(payloadMsg); - while (more) { - - zframe_t* h_msg = zframe_recv(sub->zmq_socket); - if (h_msg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n"); - } else { - perror("PSA_ZMQ_TS: h_recv"); - } - break; - } - - zframe_t* p_msg = zframe_recv(sub->zmq_socket); - if (p_msg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n"); - } else { - perror("PSA_ZMQ_TS: p_recv"); - } - zframe_destroy(&h_msg); - break; - } - - complete_zmq_msg_pt c_msg = calloc(1, sizeof(struct complete_zmq_msg)); - c_msg->header = h_msg; - c_msg->payload = p_msg; - arrayList_add(msg_list, c_msg); - - if (!zframe_more(p_msg)) { - more = false; - } - } - - celixThreadMutex_lock(&sub->ts_lock); - process_msg(sub, msg_list); - celixThreadMutex_unlock(&sub->ts_lock); - - } - - } //zframe_more(headerMsg) - else { - free(headerMsg); - printf("PSA_ZMQ_TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic); - } - - } // headerMsg != NULL - celixThreadMutex_unlock(&sub->socket_lock); - connectPendingPublishers(sub); - disconnectPendingPublishers(sub); - } // while - - return NULL; -} - -static void connectPendingPublishers(topic_subscription_pt sub) { - celixThreadMutex_lock(&sub->pendingConnections_lock); - while(!arrayList_isEmpty(sub->pendingConnections)) { - char * pubEP = arrayList_remove(sub->pendingConnections, 0); - pubsub_topicSubscriptionConnectPublisher(sub, pubEP); - free(pubEP); - } - celixThreadMutex_unlock(&sub->pendingConnections_lock); -} - -static void disconnectPendingPublishers(topic_subscription_pt sub) { - celixThreadMutex_lock(&sub->pendingDisconnections_lock); - while(!arrayList_isEmpty(sub->pendingDisconnections)) { - char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); - pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); - free(pubEP); - } - celixThreadMutex_unlock(&sub->pendingDisconnections_lock); -} - -static void sigusr1_sighandler(int signo){ - printf("PSA_ZMQ_TS: Topic subscription being shut down...\n"); - return; -} - -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ - bool check=false; - int major=0,minor=0; - - if(msgVersion!=NULL){ - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ - check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ - } - } - - return check; -} - -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; -} - -static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part){ - - if(handle==NULL){ - *part = NULL; - return -1; - } - - mp_handle_pt mp_handle = (mp_handle_pt)handle; - msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map, (void*)(uintptr_t) msgTypeId); - if(entry!=NULL){ - entry->retain = retain; - *part = entry->msgInst; - } - else{ - printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId); - *part=NULL; - return -2; - } - - return 0; - -} - -static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){ - - if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message - return NULL; - } - - mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle)); - mp_handle->svc_msg_db = svc_msg_db; - mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL); - - int i=1; //We skip the first message, it will be handle differently - for(;i<arrayList_size(rcv_msg_list);i++){ - complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i); - pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header); - - pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, (void*)(uintptr_t)(header->type)); - - if (msgSer!= NULL) { - void *msgInst = NULL; - - bool validVersion = checkVersion(msgSer->msgVersion,header); - - if(validVersion){ - celix_status_t status = msgSer->deserialize(msgSer, (const void*)zframe_data(c_msg->payload), 0, &msgInst); - - if(status == CELIX_SUCCESS){ - msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); - entry->msgInst = msgInst; - hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)header->type,entry); - } - } - } - } - - return mp_handle; - -} - -static void destroy_mp_handle(mp_handle_pt mp_handle){ - - hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry); - msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry); - pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId); - - if(msgSer!=NULL){ - if(!msgEntry->retain){ - msgSer->freeMsg(msgSer->handle,msgEntry->msgInst); - } - } - else{ - printf("PSA_ZMQ_TS: ERROR: Cannot find messageSerializer for msg %u, so cannot destroy it!\n",msgId); - } - - free(msgEntry); - } - hashMapIterator_destroy(iter); - - hashMap_destroy(mp_handle->rcv_msg_map,false,false); - free(mp_handle); -}
