http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c deleted file mode 100644 index 1fbdb08..0000000 --- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c +++ /dev/null @@ -1,1039 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_admin_impl.c - * - * \date Sep 30, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <stdio.h> -#include <stdlib.h> - -#ifndef ANDROID -#include <ifaddrs.h> -#endif - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <string.h> - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netdb.h> - -#include "constants.h" -#include "utils.h" -#include "hash_map.h" -#include "array_list.h" -#include "bundle_context.h" -#include "bundle.h" -#include "service_reference.h" -#include "service_registration.h" -#include "log_helper.h" -#include "log_service.h" -#include "celix_threads.h" -#include "service_factory.h" - -#include "pubsub_admin_impl.h" -#include "topic_subscription.h" -#include "topic_publication.h" -#include "pubsub_endpoint.h" -#include "subscriber.h" -#include "pubsub_admin_match.h" - -static const char *DEFAULT_MC_IP = "224.100.1.1"; -static char *DEFAULT_MC_PREFIX = "224.100"; - -static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip); -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - -static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc); -static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); -static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); - -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { - celix_status_t status = CELIX_SUCCESS; - - *admin = calloc(1, sizeof(**admin)); - - if (!*admin) { - return CELIX_ENOMEM; - } - - char *mc_ip = NULL; - char *if_ip = NULL; - int sendSocket = -1; - - if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { - logHelper_start((*admin)->loghelper); - } - const char *mc_ip_prop = NULL; - bundleContext_getProperty(context,PSA_IP , &mc_ip_prop); - if(mc_ip_prop) { - mc_ip = strdup(mc_ip_prop); - } - -#ifndef ANDROID - if (mc_ip == NULL) { - const char *mc_prefix = NULL; - const char *interface = NULL; - int b0 = 0, b1 = 0, b2 = 0, b3 = 0; - bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix); - if(mc_prefix == NULL) { - mc_prefix = DEFAULT_MC_PREFIX; - } - - bundleContext_getProperty(context, PSA_ITF, &interface); - if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for interface %s", interface); - } - - printf("IP Detected : %s\n", if_ip); - if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip); - b2 = 1; - b3 = 1; - } - - asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3); - - sendSocket = socket(AF_INET, SOCK_DGRAM, 0); - if(sendSocket == -1) { - perror("pubsubAdmin_create:socket"); - status = CELIX_SERVICE_EXCEPTION; - } - - if(status == CELIX_SUCCESS){ - char loop = 1; - if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) { - perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)"); - status = CELIX_SERVICE_EXCEPTION; - } - } - - if(status == CELIX_SUCCESS){ - struct in_addr multicast_interface; - inet_aton(if_ip, &multicast_interface); - if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) { - perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)"); - status = CELIX_SERVICE_EXCEPTION; - } - } - - } - - - if(status != CELIX_SUCCESS){ - logHelper_stop((*admin)->loghelper); - logHelper_destroy(&((*admin)->loghelper)); - if(sendSocket >=0){ - close(sendSocket); - } - if(if_ip != NULL){ - free(if_ip); - } - if(mc_ip != NULL){ - free(mc_ip); - } - return status; - } - else{ - (*admin)->sendSocket = sendSocket; - } - -#endif - - (*admin)->bundle_context= context; - (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); - (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); - arrayList_create(&((*admin)->noSerializerSubscriptions)); - arrayList_create(&((*admin)->noSerializerPublications)); - arrayList_create(&((*admin)->serializerList)); - - celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); - celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); - celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); - celixThreadMutex_create(&(*admin)->serializerListLock, NULL); - celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL); - - celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr); - celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE); - celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr); - - celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr); - celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE); - celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr); - - if (if_ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast communication", if_ip); - (*admin)->ifIpAddress = if_ip; - } else { - (*admin)->ifIpAddress = strdup("127.0.0.1"); - } - - if (mc_ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip); - (*admin)->mcIpAddress = mc_ip; - } - else { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP); - (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP); - } - - return status; -} - - -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) -{ - celix_status_t status = CELIX_SUCCESS; - - free(admin->mcIpAddress); - free(admin->ifIpAddress); - - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - free((char*)hashMapEntry_getKey(entry)); - arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->pendingSubscriptions,false,false); - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - celixThreadMutex_lock(&admin->subscriptionsLock); - hashMap_destroy(admin->subscriptions,false,false); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - celixThreadMutex_lock(&admin->localPublicationsLock); - hashMap_destroy(admin->localPublications,true,false); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - celixThreadMutex_lock(&admin->externalPublicationsLock); - iter = hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - free((char*)hashMapEntry_getKey(entry)); - arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->externalPublications,false,false); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - - celixThreadMutex_lock(&admin->serializerListLock); - arrayList_destroy(admin->serializerList); - celixThreadMutex_unlock(&admin->serializerListLock); - - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_destroy(admin->noSerializerSubscriptions); - arrayList_destroy(admin->noSerializerPublications); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - - celixThreadMutex_lock(&admin->usedSerializersLock); - - iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer); - while(hashMapIterator_hasNext(iter)){ - arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false); - - iter = hashMapIterator_create(admin->topicPublicationsPerSerializer); - while(hashMapIterator_hasNext(iter)){ - arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->topicPublicationsPerSerializer,false,false); - - celixThreadMutex_unlock(&admin->usedSerializersLock); - - celixThreadMutex_destroy(&admin->usedSerializersLock); - celixThreadMutex_destroy(&admin->serializerListLock); - - celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr); - celixThreadMutex_destroy(&admin->noSerializerPendingsLock); - - celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); - celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr); - - celixThreadMutex_destroy(&admin->subscriptionsLock); - celixThreadMutex_destroy(&admin->localPublicationsLock); - celixThreadMutex_destroy(&admin->externalPublicationsLock); - - logHelper_stop(admin->loghelper); - - logHelper_destroy(&admin->loghelper); - - free(admin); - - return status; -} - -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - - if(any_sub==NULL){ - - int i; - pubsub_serializer_service_t *best_serializer = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ - status = pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub); - } - else{ - printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - if (status == CELIX_SUCCESS){ - - /* Connect all internal publishers */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - } - arrayList_destroy(topic_publishers); - } - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Connect also all external publishers */ - celixThreadMutex_lock(&admin->externalPublicationsLock); - hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(extp_iter)){ - array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); - if(ext_pub_list!=NULL){ - for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - } - } - } - hashMapIterator_destroy(extp_iter); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - - - pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); - - status += pubsub_topicSubscriptionStart(any_sub); - - } - - if (status == CELIX_SUCCESS){ - hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); - connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false); - } - - } - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic); - - if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ - return pubsubAdmin_addAnySubscription(admin,subEP); - } - - /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - celixThreadMutex_lock(&admin->subscriptionsLock); - celixThreadMutex_lock(&admin->localPublicationsLock); - celixThreadMutex_lock(&admin->externalPublicationsLock); - - char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic); - - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - - if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic - pubsubAdmin_addSubscriptionToPendingList(admin,subEP); - } - else{ - int i; - topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); - - if(subscription == NULL) { - pubsub_serializer_service_t *best_serializer = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ - status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, subEP->scope, subEP->topic, best_serializer, &subscription); - } - else{ - printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - if (status==CELIX_SUCCESS){ - - /* Try to connect internal publishers */ - if(factory!=NULL){ - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;i<arrayList_size(topic_publishers);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); - } - } - arrayList_destroy(topic_publishers); - } - - } - - /* Look also for external publishers */ - if(ext_pub_list!=NULL){ - for(i=0;i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); - } - } - } - - pubsub_topicSubscriptionAddSubscriber(subscription,subEP); - - status += pubsub_topicSubscriptionStart(subscription); - - } - - if(status==CELIX_SUCCESS){ - - hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); - - connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); - } - } - - if (status == CELIX_SUCCESS){ - pubsub_topicIncreaseNrSubscribers(subscription); - } - } - - free(scope_topic); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - celixThreadMutex_unlock(&admin->localPublicationsLock); - celixThreadMutex_unlock(&admin->subscriptionsLock); - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); - - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); - - celixThreadMutex_lock(&admin->subscriptionsLock); - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL){ - pubsub_topicDecreaseNrSubscribers(sub); - if(pubsub_topicGetNrSubscribers(sub) == 0) { - status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP); - } - } - celixThreadMutex_unlock(&admin->subscriptionsLock); - - if(sub==NULL){ - /* Maybe the endpoint was pending */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){ - status = CELIX_ILLEGAL_STATE; - } - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - free(scope_topic); - - - - return status; - -} - -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic); - - const char* fwUUID = NULL; - - bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - - if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { - - celixThreadMutex_lock(&admin->localPublicationsLock); - - service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); - - if (factory == NULL) { - topic_publication_pt pub = NULL; - pubsub_serializer_service_t *best_serializer = NULL; - if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){ - status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, admin->mcIpAddress, &pub); - } - else{ - printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic); - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerPublications,pubEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - if (status == CELIX_SUCCESS) { - status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); - if (status == CELIX_SUCCESS && factory != NULL) { - hashMap_put(admin->localPublications, strdup(scope_topic), factory); - connectTopicPubSubToSerializer(admin, best_serializer, pub, true); - } - } else { - printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); - } - } else { - //just add the new EP to the list - topic_publication_pt pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationAddPublisherEP(pub, pubEP); - } - - celixThreadMutex_unlock(&admin->localPublicationsLock); - } - else{ - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); - if (ext_pub_list == NULL) { - arrayList_create(&ext_pub_list); - hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); - } - - arrayList_add(ext_pub_list, pubEP); - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Re-evaluate the pending subscriptions */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - - hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); - if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. - char* topic = (char*) hashMapEntry_getKey(pendingSub); - array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); - int i; - for (i = 0; i < arrayList_size(pendingSubList); i++) { - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); - pubsubAdmin_addSubscription(admin, subEP); - } - hashMap_remove(admin->pendingSubscriptions, scope_topic); - arrayList_clear(pendingSubList); - arrayList_destroy(pendingSubList); - free(topic); - } - - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - /* Connect the new publisher to the subscription for his topic, if there is any */ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); - if (sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); - if (any_sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); - } - - free(scope_topic); - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ - celix_status_t status = CELIX_SUCCESS; - int count = 0; - - printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic); - - const char* fwUUID = NULL; - - bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - - if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ - - celixThreadMutex_lock(&admin->localPublicationsLock); - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - if(factory!=NULL){ - topic_publication_pt pub = (topic_publication_pt)factory->handle; - pubsub_topicPublicationRemovePublisherEP(pub,pubEP); - } - celixThreadMutex_unlock(&admin->localPublicationsLock); - - if(factory==NULL){ - /* Maybe the endpoint was pending */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){ - status = CELIX_ILLEGAL_STATE; - } - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - } - else{ - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - if(ext_pub_list!=NULL){ - int i; - bool found = false; - for(i=0;!found && i<arrayList_size(ext_pub_list);i++){ - pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - found = pubsubEndpoint_equals(pubEP,p); - if (found){ - arrayList_remove(ext_pub_list,i); - } - } - // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) - for(i=0; i<arrayList_size(ext_pub_list);i++) { - pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if (strcmp(pubEP->endpoint,p->endpoint) == 0) { - count++; - } - } - - if(arrayList_size(ext_pub_list)==0){ - hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); - char* topic = (char*)hashMapEntry_getKey(entry); - array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); - hashMap_remove(admin->externalPublications,topic); - arrayList_destroy(list); - free(topic); - } - } - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Check if this publisher was connected to one of our subscribers*/ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint); - } - - free(scope_topic); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic); - - celixThreadMutex_lock(&admin->localPublicationsLock); - char* scope_topic =createScopeTopicKey(scope, topic); - hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); - if(pubsvc_entry!=NULL){ - char* key = (char*)hashMapEntry_getKey(pubsvc_entry); - service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); - topic_publication_pt pub = (topic_publication_pt)factory->handle; - status += pubsub_topicPublicationStop(pub); - disconnectTopicPubSubFromSerializer(admin, pub, true); - status += pubsub_topicPublicationDestroy(pub); - hashMap_remove(admin->localPublications,scope_topic); - free(key); - free(factory); - } - free(scope_topic); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - return status; - -} - -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA_UDP_MC: Closing all subscriptions\n"); - - celixThreadMutex_lock(&admin->subscriptionsLock); - char* scope_topic =createScopeTopicKey(scope, topic); - hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic); - if(sub_entry!=NULL){ - char* topic = (char*)hashMapEntry_getKey(sub_entry); - - topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry); - status += pubsub_topicSubscriptionStop(ts); - disconnectTopicPubSubFromSerializer(admin, ts, false); - status += pubsub_topicSubscriptionDestroy(ts); - hashMap_remove(admin->subscriptions,topic); - free(topic); - - } - free(scope_topic); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - - -#ifndef ANDROID -static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) { - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - - struct ifaddrs *ifaddr, *ifa; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) != -1) - { - for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == NULL) - continue; - - if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { - if (interface == NULL) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - else if (strcmp(ifa->ifa_name, interface) == 0) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - } - } - - freeifaddrs(ifaddr); - } - - return status; -} -#endif - -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - char* scope_topic =createScopeTopicKey(subEP->scope, subEP->topic); - array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); - if(pendingListPerTopic==NULL){ - arrayList_create(&pendingListPerTopic); - hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); - } - arrayList_add(pendingListPerTopic,subEP); - free(scope_topic); - - return status; -} - - -celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){ - /* Assumption: serializers are all available at startup. - * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */ - - celix_status_t status = CELIX_SUCCESS; - int i=0; - - const char *serType = NULL; - serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); - if(serType == NULL){ - printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); - return CELIX_SERVICE_EXCEPTION; - } - - pubsub_admin_pt admin = (pubsub_admin_pt)handle; - celixThreadMutex_lock(&admin->serializerListLock); - arrayList_add(admin->serializerList, reference); - celixThreadMutex_unlock(&admin->serializerListLock); - - /* Now let's re-evaluate the pending */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - - for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){ - pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); - pubsub_serializer_service_t *best_serializer = NULL; - pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); - if(best_serializer != NULL){ /* Finally we have a valid serializer! */ - pubsubAdmin_addSubscription(admin, ep); - } - } - - for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){ - pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); - pubsub_serializer_service_t *best_serializer = NULL; - pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); - if(best_serializer != NULL){ /* Finally we have a valid serializer! */ - pubsubAdmin_addPublication(admin, ep); - } - } - - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - - printf("PSA_UDP_MC: %s serializer added\n",serType); - - return status; -} - -celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ - - pubsub_admin_pt admin = (pubsub_admin_pt)handle; - int i=0, j=0; - const char *serType = NULL; - - serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); - if(serType == NULL){ - printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); - return CELIX_SERVICE_EXCEPTION; - } - - celixThreadMutex_lock(&admin->serializerListLock); - /* Remove the serializer from the list */ - arrayList_removeElement(admin->serializerList, reference); - celixThreadMutex_unlock(&admin->serializerListLock); - - celixThreadMutex_lock(&admin->usedSerializersLock); - array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service); - array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service); - celixThreadMutex_unlock(&admin->usedSerializersLock); - - /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */ - if(topicPubList!=NULL){ - for(i=0;i<arrayList_size(topicPubList);i++){ - topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i); - /* Stop the topic publication */ - pubsub_topicPublicationStop(topicPub); - /* Get the endpoints that are going to be orphan */ - array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub); - for(j=0;j<arrayList_size(pubList);j++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j); - /* Remove the publication */ - pubsubAdmin_removePublication(admin, pubEP); - /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(pubEP->endpoint!=NULL){ - free(pubEP->endpoint); - pubEP->endpoint = NULL; - } - /* Add the orphan endpoint to the noSerializer pending list */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerPublications,pubEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - arrayList_destroy(pubList); - - /* Cleanup also the localPublications hashmap*/ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications); - char *key = NULL; - service_factory_pt factory = NULL; - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - factory = (service_factory_pt)hashMapEntry_getValue(entry); - topic_publication_pt pub = (topic_publication_pt)factory->handle; - if(pub==topicPub){ - key = (char*)hashMapEntry_getKey(entry); - break; - } - } - hashMapIterator_destroy(iter); - if(key!=NULL){ - hashMap_remove(admin->localPublications, key); - free(factory); - free(key); - } - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Finally destroy the topicPublication */ - pubsub_topicPublicationDestroy(topicPub); - } - arrayList_destroy(topicPubList); - } - - /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */ - if(topicSubList!=NULL){ - for(i=0;i<arrayList_size(topicSubList);i++){ - topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i); - /* Stop the topic subscription */ - pubsub_topicSubscriptionStop(topicSub); - /* Get the endpoints that are going to be orphan */ - array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub); - for(j=0;j<arrayList_size(subList);j++){ - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j); - /* Remove the subscription */ - pubsubAdmin_removeSubscription(admin, subEP); - /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(subEP->endpoint!=NULL){ - free(subEP->endpoint); - subEP->endpoint = NULL; - } - /* Add the orphan endpoint to the noSerializer pending list */ - celixThreadMutex_lock(&admin->noSerializerPendingsLock); - arrayList_add(admin->noSerializerSubscriptions,subEP); - celixThreadMutex_unlock(&admin->noSerializerPendingsLock); - } - - /* Cleanup also the subscriptions hashmap*/ - celixThreadMutex_lock(&admin->subscriptionsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions); - char *key = NULL; - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry); - if(sub==topicSub){ - key = (char*)hashMapEntry_getKey(entry); - break; - } - } - hashMapIterator_destroy(iter); - if(key!=NULL){ - hashMap_remove(admin->subscriptions, key); - free(key); - } - celixThreadMutex_unlock(&admin->subscriptionsLock); - - /* Finally destroy the topicSubscription */ - pubsub_topicSubscriptionDestroy(topicSub); - } - arrayList_destroy(topicSubList); - } - - printf("PSA_UDP_MC: %s serializer removed\n",serType); - - - return CELIX_SUCCESS; -} - -celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->serializerListLock); - status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score); - celixThreadMutex_unlock(&admin->serializerListLock); - - return status; -} - -/* This one recall the same logic as in the match function */ -static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){ - - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->serializerListLock); - status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc); - celixThreadMutex_unlock(&admin->serializerListLock); - - return status; - -} - -static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ - - celixThreadMutex_lock(&admin->usedSerializersLock); - - hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; - array_list_pt list = (array_list_pt)hashMap_get(map,serializer); - if(list==NULL){ - arrayList_create(&list); - hashMap_put(map,serializer,list); - } - arrayList_add(list,topicPubSub); - - celixThreadMutex_unlock(&admin->usedSerializersLock); - -} - -static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ - - celixThreadMutex_lock(&admin->usedSerializersLock); - - hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; - hash_map_iterator_pt iter = hashMapIterator_create(map); - while(hashMapIterator_hasNext(iter)){ - array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter); - if(arrayList_removeElement(list, topicPubSub)){ //Found it! - break; - } - } - hashMapIterator_destroy(iter); - - celixThreadMutex_unlock(&admin->usedSerializersLock); - -}
http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c deleted file mode 100644 index e43ec29..0000000 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c +++ /dev/null @@ -1,444 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * topic_publication.c - * - * \date Sep 24, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <errno.h> - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> - -#include "array_list.h" -#include "celixbool.h" -#include "service_registration.h" -#include "utils.h" -#include "service_factory.h" -#include "version.h" - -#include "topic_publication.h" -#include "pubsub_common.h" -#include "publisher.h" -#include "large_udp.h" - -#include "pubsub_serializer.h" - -#define EP_ADDRESS_LEN 32 - -#define FIRST_SEND_DELAY 2 - -struct topic_publication { - int sendSocket; - char* endpoint; - service_registration_pt svcFactoryReg; - array_list_pt pub_ep_list; //List<pubsub_endpoint> - hash_map_pt boundServices; //<bundle_pt,bound_service> - celix_thread_mutex_t tp_lock; - pubsub_serializer_service_t *serializer; - struct sockaddr_in destAddr; -}; - -typedef struct publish_bundle_bound_service { - topic_publication_pt parent; - pubsub_publisher_t service; - bundle_pt bundle; - char *scope; - char *topic; - hash_map_pt msgTypes; - unsigned short getCount; - celix_thread_mutex_t mp_lock; - largeUdp_pt largeUdpHandle; -}* publish_bundle_bound_service_pt; - - -typedef struct pubsub_msg{ - pubsub_msg_header_pt header; - char* payload; - unsigned int payloadSize; -} pubsub_msg_t; - - -static unsigned int rand_range(unsigned int min, unsigned int max); - -static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); - -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); - -static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); - -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); - - -static void delay_first_send_for_late_joiners(void); - - -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){ - - char* ep = malloc(EP_ADDRESS_LEN); - memset(ep,0,EP_ADDRESS_LEN); - unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT); - snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port); - - - topic_publication_pt pub = calloc(1,sizeof(*pub)); - - arrayList_create(&(pub->pub_ep_list)); - pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL); - celixThreadMutex_create(&(pub->tp_lock),NULL); - - pub->endpoint = ep; - pub->sendSocket = sendSocket; - pub->destAddr.sin_family = AF_INET; - pub->destAddr.sin_addr.s_addr = inet_addr(bindIP); - pub->destAddr.sin_port = htons(port); - - pub->serializer = best_serializer; - - pubsub_topicPublicationAddPublisherEP(pub,pubEP); - - *out = pub; - - return CELIX_SUCCESS; -} - -celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&(pub->tp_lock)); - - free(pub->endpoint); - arrayList_destroy(pub->pub_ep_list); - - hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); - pubsub_destroyPublishBundleBoundService(bound); - } - hashMapIterator_destroy(iter); - hashMap_destroy(pub->boundServices,false,false); - - pub->svcFactoryReg = NULL; - pub->serializer = NULL; - - if(close(pub->sendSocket) != 0){ - status = CELIX_FILE_IO_EXCEPTION; - } - - celixThreadMutex_unlock(&(pub->tp_lock)); - - celixThreadMutex_destroy(&(pub->tp_lock)); - - free(pub); - - return status; -} - -celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){ - celix_status_t status = CELIX_SUCCESS; - - /* Let's register the new service */ - - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); - - if(pubEP!=NULL){ - service_factory_pt factory = calloc(1, sizeof(*factory)); - factory->handle = pub; - factory->getService = pubsub_topicPublicationGetService; - factory->ungetService = pubsub_topicPublicationUngetService; - - properties_pt props = properties_create(); - properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); - properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); - - status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); - - if(status != CELIX_SUCCESS){ - properties_destroy(props); - printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID); - } - else{ - *svcFactory = factory; - } - } - else{ - printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); - status = CELIX_SERVICE_EXCEPTION; - } - - return status; -} - -celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ - return serviceRegistration_unregister(pub->svcFactoryReg); -} - -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ - - celixThreadMutex_lock(&(pub->tp_lock)); - ep->endpoint = strdup(pub->endpoint); - arrayList_add(pub->pub_ep_list,ep); - celixThreadMutex_unlock(&(pub->tp_lock)); - - return CELIX_SUCCESS; -} - -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ - - celixThreadMutex_lock(&(pub->tp_lock)); - arrayList_removeElement(pub->pub_ep_list,ep); - celixThreadMutex_unlock(&(pub->tp_lock)); - - return CELIX_SUCCESS; -} - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ - array_list_pt list = NULL; - celixThreadMutex_lock(&(pub->tp_lock)); - list = arrayList_clone(pub->pub_ep_list); - celixThreadMutex_unlock(&(pub->tp_lock)); - return list; -} - - -static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { - celix_status_t status = CELIX_SUCCESS; - - topic_publication_pt publish = (topic_publication_pt)handle; - - celixThreadMutex_lock(&(publish->tp_lock)); - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound==NULL){ - bound = pubsub_createPublishBundleBoundService(publish,bundle); - if(bound!=NULL){ - hashMap_put(publish->boundServices,bundle,bound); - } - } - else{ - bound->getCount++; - } - - if (bound != NULL) { - *service = &bound->service; - } - - celixThreadMutex_unlock(&(publish->tp_lock)); - - return status; -} - -static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { - - topic_publication_pt publish = (topic_publication_pt)handle; - - celixThreadMutex_lock(&(publish->tp_lock)); - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound!=NULL){ - - bound->getCount--; - if(bound->getCount==0){ - pubsub_destroyPublishBundleBoundService(bound); - hashMap_remove(publish->boundServices,bundle); - } - - } - else{ - long bundleId = -1; - bundle_getBundleId(bundle,&bundleId); - printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId); - } - - /* service should be never used for unget, so let's set the pointer to NULL */ - *service = NULL; - - celixThreadMutex_unlock(&(publish->tp_lock)); - - return CELIX_SUCCESS; -} - -static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){ - const int iovec_len = 3; // header + size + payload - bool ret = true; - - struct iovec msg_iovec[iovec_len]; - msg_iovec[0].iov_base = msg->header; - msg_iovec[0].iov_len = sizeof(*msg->header); - msg_iovec[1].iov_base = &msg->payloadSize; - msg_iovec[1].iov_len = sizeof(msg->payloadSize); - msg_iovec[2].iov_base = msg->payload; - msg_iovec[2].iov_len = msg->payloadSize; - - delay_first_send_for_late_joiners(); - - if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) { - perror("send_pubsub_msg:sendSocket"); - ret = false; - } - - if(releaseCallback) { - releaseCallback->release(msg->payload, bound); - } - return ret; - -} - - -static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) { - int status = 0; - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; - - celixThreadMutex_lock(&(bound->parent->tp_lock)); - celixThreadMutex_lock(&(bound->mp_lock)); - - pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId); - - if (msgSer != NULL) { - int major=0, minor=0; - - pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); - strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); - msg_hdr->type = msgTypeId; - - - if (msgSer->msgVersion != NULL){ - version_getMajor(msgSer->msgVersion, &major); - version_getMinor(msgSer->msgVersion, &minor); - msg_hdr->major = major; - msg_hdr->minor = minor; - } - - void* serializedOutput = NULL; - size_t serializedOutputLen = 0; - msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); - - pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t)); - msg->header = msg_hdr; - msg->payload = (char*)serializedOutput; - msg->payloadSize = serializedOutputLen; - - - if(send_pubsub_msg(bound, msg,true, NULL) == false) { - status = -1; - } - free(msg_hdr); - free(msg); - free(serializedOutput); - - - } else { - printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId); - status=-1; - } - - celixThreadMutex_unlock(&(bound->mp_lock)); - celixThreadMutex_unlock(&(bound->parent->tp_lock)); - - return status; -} - -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; -} - - -static unsigned int rand_range(unsigned int min, unsigned int max){ - - double scaled = (double)(((double)random())/((double)RAND_MAX)); - return (max-min+1)*scaled + min; - -} - -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ - - publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); - - if (bound != NULL) { - - bound->parent = tp; - bound->bundle = bundle; - bound->getCount = 1; - celixThreadMutex_create(&bound->mp_lock,NULL); - - if(tp->serializer != NULL){ - tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes); - } - - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); - bound->scope=strdup(pubEP->scope); - bound->topic=strdup(pubEP->topic); - bound->largeUdpHandle = largeUdp_create(1); - - bound->service.handle = bound; - bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->service.send = pubsub_topicPublicationSend; - bound->service.sendMultipart = NULL; //Multipart not supported for UDP - - } - - return bound; -} - -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ - - celixThreadMutex_lock(&boundSvc->mp_lock); - - if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){ - boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes); - } - - if(boundSvc->scope!=NULL){ - free(boundSvc->scope); - } - - if(boundSvc->topic!=NULL){ - free(boundSvc->topic); - } - - largeUdp_destroy(boundSvc->largeUdpHandle); - - celixThreadMutex_unlock(&boundSvc->mp_lock); - celixThreadMutex_destroy(&boundSvc->mp_lock); - - free(boundSvc); - -} - -static void delay_first_send_for_late_joiners(){ - - static bool firstSend = true; - - if(firstSend){ - printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); - sleep(FIRST_SEND_DELAY); - firstSend = false; - } -} http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c deleted file mode 100644 index d8e6f45..0000000 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c +++ /dev/null @@ -1,635 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * topic_subscription.c - * - * \date Oct 2, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <string.h> -#include <stdlib.h> -#include <unistd.h> -#include <signal.h> - -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/epoll.h> -#include <netinet/in.h> -#include <arpa/inet.h> - -#include "utils.h" -#include "celix_errno.h" -#include "constants.h" -#include "version.h" - -#include "topic_subscription.h" -#include "topic_publication.h" -#include "subscriber.h" -#include "publisher.h" -#include "large_udp.h" - -#include "pubsub_serializer.h" - -#define MAX_EPOLL_EVENTS 10 -#define RECV_THREAD_TIMEOUT 5 -#define UDP_BUFFER_SIZE 65535 -#define MAX_UDP_SESSIONS 16 - -struct topic_subscription{ - char* ifIpAddress; - service_tracker_pt tracker; - array_list_pt sub_ep_list; - celix_thread_t recv_thread; - bool running; - celix_thread_mutex_t ts_lock; - bundle_context_pt context; - - pubsub_serializer_service_t *serializer; - - int topicEpollFd; // EPOLL filedescriptor where the sockets are registered. - hash_map_pt servicesMap; // key = service, value = msg types map - hash_map_pt socketMap; // key = URL, value = listen-socket - celix_thread_mutex_t socketMap_lock; - - celix_thread_mutex_t pendingConnections_lock; - array_list_pt pendingConnections; - - array_list_pt pendingDisconnections; - celix_thread_mutex_t pendingDisconnections_lock; - - //array_list_pt rawServices; - unsigned int nrSubscribers; - largeUdp_pt largeUdpHandle; -}; - -typedef struct msg_map_entry{ - bool retain; - void* msgInst; -}* msg_map_entry_pt; - -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); -static void* udp_recv_thread_func(void* arg); -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); -static void sigusr1_sighandler(int signo); -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); -static void connectPendingPublishers(topic_subscription_pt sub); -static void disconnectPendingPublishers(topic_subscription_pt sub); - - -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ - celix_status_t status = CELIX_SUCCESS; - - topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); - ts->context = bundle_context; - ts->ifIpAddress = strdup(ifIp); -#if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX -#else - ts->topicEpollFd = epoll_create1(0); -#endif - if(ts->topicEpollFd == -1) { - status += CELIX_SERVICE_EXCEPTION; - } - - ts->running = false; - ts->nrSubscribers = 0; - ts->serializer = best_serializer; - - celixThreadMutex_create(&ts->ts_lock,NULL); - arrayList_create(&ts->sub_ep_list); - ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); - ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - - arrayList_create(&ts->pendingConnections); - arrayList_create(&ts->pendingDisconnections); - celixThreadMutex_create(&ts->pendingConnections_lock, NULL); - celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); - celixThreadMutex_create(&ts->socketMap_lock, NULL); - - ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); - - char filter[128]; - memset(filter,0,128); - if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { - // default scope, means that subscriber has not defined a scope property - snprintf(filter, 128, "(&(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic); - - } else { - snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic, - PUBSUB_SUBSCRIBER_SCOPE,scope); - } - - service_tracker_customizer_pt customizer = NULL; - status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); - status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); - - struct sigaction actions; - memset(&actions, 0, sizeof(actions)); - sigemptyset(&actions.sa_mask); - actions.sa_flags = 0; - actions.sa_handler = sigusr1_sighandler; - - sigaction(SIGUSR1,&actions,NULL); - - if (status == CELIX_SUCCESS) { - *out=ts; - } - - return status; -} - -celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - ts->running = false; - free(ts->ifIpAddress); - serviceTracker_destroy(ts->tracker); - arrayList_clear(ts->sub_ep_list); - arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->servicesMap,false,false); - - celixThreadMutex_lock(&ts->socketMap_lock); - hashMap_destroy(ts->socketMap,true,true); - celixThreadMutex_unlock(&ts->socketMap_lock); - celixThreadMutex_destroy(&ts->socketMap_lock); - - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_destroy(ts->pendingConnections); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - celixThreadMutex_destroy(&ts->pendingConnections_lock); - - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_destroy(ts->pendingDisconnections); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - celixThreadMutex_destroy(&ts->pendingDisconnections_lock); - - largeUdp_destroy(ts->largeUdpHandle); -#if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX -#else - close(ts->topicEpollFd); -#endif - - celixThreadMutex_unlock(&ts->ts_lock); - - celixThreadMutex_destroy(&ts->ts_lock); - - free(ts); - - return status; -} - -celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - status = serviceTracker_open(ts->tracker); - - ts->running = true; - - if(status==CELIX_SUCCESS){ - status=celixThread_create(&ts->recv_thread,NULL,udp_recv_thread_func,ts); - } - - return status; -} - -celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - - ts->running = false; - - pthread_kill(ts->recv_thread.thread,SIGUSR1); - - celixThread_join(ts->recv_thread,NULL); - - status = serviceTracker_close(ts->tracker); - - celixThreadMutex_lock(&ts->socketMap_lock); - hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap); - while(hashMapIterator_hasNext(it)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(it); - char *url = hashMapEntry_getKey(entry); - int *s = hashMapEntry_getValue(entry); - memset(&ev, 0, sizeof(ev)); - if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) { - printf("in if error()\n"); - perror("epoll_ctl() EPOLL_CTL_DEL"); - status += CELIX_SERVICE_EXCEPTION; - } - free(s); - free(url); - //hashMapIterator_remove(it); - } - hashMapIterator_destroy(it); - hashMap_clear(ts->socketMap, false, false); - celixThreadMutex_unlock(&ts->socketMap_lock); - - - return status; -} - -celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) { - - printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL); - - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->socketMap_lock); - - if(!hashMap_containsKey(ts->socketMap, pubURL)){ - - int *recvSocket = calloc(sizeof(int), 1); - *recvSocket = socket(AF_INET, SOCK_DGRAM, 0); - if (*recvSocket < 0) { - perror("pubsub_topicSubscriptionCreate:socket"); - status = CELIX_SERVICE_EXCEPTION; - } - - if (status == CELIX_SUCCESS){ - int reuse = 1; - if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) { - perror("setsockopt() SO_REUSEADDR"); - status = CELIX_SERVICE_EXCEPTION; - } - } - - if(status == CELIX_SUCCESS){ - // TODO Check if there is a better way to parse the URL to IP/Portnr - //replace ':' by spaces - char *url = strdup(pubURL); - char *pt = url; - while((pt=strchr(pt, ':')) != NULL) { - *pt = ' '; - } - char mcIp[100]; - unsigned short mcPort; - sscanf(url, "udp //%s %hu", mcIp, &mcPort); - free(url); - - printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort); - - struct ip_mreq mc_addr; - mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp); - mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress); - printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress); - if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) { - perror("setsockopt() IP_ADD_MEMBERSHIP"); - status = CELIX_SERVICE_EXCEPTION; - } - - if (status == CELIX_SUCCESS){ - struct sockaddr_in mcListenAddr; - mcListenAddr.sin_family = AF_INET; - mcListenAddr.sin_addr.s_addr = INADDR_ANY; - mcListenAddr.sin_port = htons(mcPort); - if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) { - perror("bind()"); - status = CELIX_SERVICE_EXCEPTION; - } - } - - if (status == CELIX_SUCCESS){ -#if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX -#else - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = *recvSocket; - if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) { - perror("epoll_ctl() EPOLL_CTL_ADD"); - status = CELIX_SERVICE_EXCEPTION; - } -#endif - } - - } - - if (status == CELIX_SUCCESS){ - hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket); - } - else{ - free(recvSocket); - } - } - - celixThreadMutex_unlock(&ts->socketMap_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_add(ts->pendingConnections, url); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - return status; -} - -celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_add(ts->pendingDisconnections, url); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - return status; -} - -celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ - printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL); - celix_status_t status = CELIX_SUCCESS; - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - - celixThreadMutex_lock(&ts->socketMap_lock); - - if (hashMap_containsKey(ts->socketMap, pubURL)){ - -#if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX -#else - int *s = hashMap_remove(ts->socketMap, pubURL); - if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) { - printf("in if error()\n"); - perror("epoll_ctl() EPOLL_CTL_DEL"); - status = CELIX_SERVICE_EXCEPTION; - } - free(s); -#endif - - } - - celixThreadMutex_unlock(&ts->socketMap_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - arrayList_add(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); - - return status; - -} - -celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers++; - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - arrayList_removeElement(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers--; - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { - return ts->nrSubscribers; -} - -array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){ - return sub->sub_ep_list; -} - - -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->servicesMap, service)) { - bundle_pt bundle = NULL; - hash_map_pt msgTypes = NULL; - - serviceReference_getBundle(reference, &bundle); - - if(ts->serializer != NULL && bundle!=NULL){ - ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes); - if(msgTypes != NULL){ - hashMap_put(ts->servicesMap, service, msgTypes); - printf("PSA_UDP_MC_TS: New subscriber registered.\n"); - } - } - else{ - printf("PSA_UDP_MC_TS: Cannot register new subscriber.\n"); - status = CELIX_SERVICE_EXCEPTION; - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; - -} - -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); - if(msgTypes!=NULL && ts->serializer!=NULL){ - ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes); - printf("PSA_ZMQ_TS: Subscriber unregistered.\n"); - } - else{ - printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n"); - status = CELIX_SERVICE_EXCEPTION; - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - printf("PSA_UDP_MC_TS: Subscriber unregistered.\n"); - return status; -} - - -static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){ - - celixThreadMutex_lock(&sub->ts_lock); - hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - hash_map_pt msgTypes = hashMapEntry_getValue(entry); - - pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type); - if (msgSer == NULL) { - printf("PSA_UDP_MC_TS: Serializer not available for message %d.\n",msg->header.type); - } - else{ - void *msgInst = NULL; - bool validVersion = checkVersion(msgSer->msgVersion,&msg->header); - - if(validVersion){ - - celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst); - - if (status == CELIX_SUCCESS) { - bool release = true; - pubsub_multipart_callbacks_t mp_callbacks; - mp_callbacks.handle = sub; - mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; - mp_callbacks.getMultipart = NULL; - - subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release); - - if(release){ - msgSer->freeMsg(msgSer,msgInst); - } - } - else{ - printf("PSA_UDP_MC_TS: Cannot deserialize msgType %s.\n",msgSer->msgName); - } - - } - else{ - int major=0,minor=0; - version_getMajor(msgSer->msgVersion,&major); - version_getMinor(msgSer->msgVersion,&minor); - printf("PSA_UDP_MC_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", - msgSer->msgName,major,minor,msg->header.major,msg->header.minor); - } - - } - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&sub->ts_lock); -} - -static void* udp_recv_thread_func(void * arg) { - topic_subscription_pt sub = (topic_subscription_pt) arg; - -#if defined(__APPLE__) && defined(__MACH__) - //TODO: use kqueue for OSX - //struct kevent events[MAX_EPOLL_EVENTS]; - while (sub->running) { - int nfds = 0; - if(nfds > 0) { - pubsub_udp_msg_t* udpMsg = NULL; - process_msg(sub, udpMsg); - } - } -#else - struct epoll_event events[MAX_EPOLL_EVENTS]; - - while (sub->running) { - int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000); - int i; - for(i = 0; i < nfds; i++ ) { - unsigned int index; - unsigned int size; - if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) { - // Handle data - pubsub_udp_msg_t *udpMsg = NULL; - if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) { - printf("PSA_UDP_MC_TS: ERROR largeUdp_read with index %d\n", index); - continue; - } - - process_msg(sub, udpMsg); - - free(udpMsg); - } - } - connectPendingPublishers(sub); - disconnectPendingPublishers(sub); - } -#endif - - return NULL; -} - -static void connectPendingPublishers(topic_subscription_pt sub) { - celixThreadMutex_lock(&sub->pendingConnections_lock); - while(!arrayList_isEmpty(sub->pendingConnections)) { - char * pubEP = arrayList_remove(sub->pendingConnections, 0); - pubsub_topicSubscriptionConnectPublisher(sub, pubEP); - free(pubEP); - } - celixThreadMutex_unlock(&sub->pendingConnections_lock); -} - -static void disconnectPendingPublishers(topic_subscription_pt sub) { - celixThreadMutex_lock(&sub->pendingDisconnections_lock); - while(!arrayList_isEmpty(sub->pendingDisconnections)) { - char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); - pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); - free(pubEP); - } - celixThreadMutex_unlock(&sub->pendingDisconnections_lock); -} - -static void sigusr1_sighandler(int signo){ - printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n"); - return; -} - -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ - bool check=false; - int major=0,minor=0; - - if(msgVersion!=NULL){ - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ - check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ - } - } - - return check; -} - -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/large_udp.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/src/large_udp.c b/pubsub/pubsub_admin_udp_mc/src/large_udp.c new file mode 100644 index 0000000..7455925 --- /dev/null +++ b/pubsub/pubsub_admin_udp_mc/src/large_udp.c @@ -0,0 +1,372 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * large_udp.c + * + * \date Mar 1, 2016 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include "large_udp.h" + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> +#include <array_list.h> +#include <pthread.h> + +#define MAX_UDP_MSG_SIZE 65535 /* 2^16 -1 */ +#define IP_HEADER_SIZE 20 +#define UDP_HEADER_SIZE 8 +//#define MTU_SIZE 1500 +#define MTU_SIZE 8000 +#define MAX_MSG_VECTOR_LEN 64 + +//#define NO_IP_FRAGMENTATION + +struct largeUdp { + unsigned int maxNrLists; + array_list_pt udpPartLists; + pthread_mutex_t dbLock; +}; + +typedef struct udpPartList { + unsigned int msg_ident; + unsigned int msg_size; + unsigned int nrPartsRemaining; + char *data; +} *udpPartList_pt; + + +typedef struct msg_part_header { + unsigned int msg_ident; + unsigned int total_msg_size; + unsigned int part_msg_size; + unsigned int offset; +} msg_part_header_t; + +#ifdef NO_IP_FRAGMENTATION +#define MAX_PART_SIZE (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) )) +#else +#define MAX_PART_SIZE (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) )) +#endif + +typedef struct msg_part { + msg_part_header_t header; + char data[MAX_PART_SIZE]; +} msg_part_t; + +// +// Create a handle +// +largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions) +{ + printf("## Creating large UDP\n"); + largeUdp_pt handle = calloc(sizeof(*handle), 1); + if(handle != NULL) { + handle->maxNrLists = maxNrUdpReceptions; + if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) { + free(handle); + handle = NULL; + } + pthread_mutex_init(&handle->dbLock, 0); + } + + return handle; +} + +// +// Destroys the handle +// +void largeUdp_destroy(largeUdp_pt handle) +{ + printf("### Destroying large UDP\n"); + if(handle != NULL) { + pthread_mutex_lock(&handle->dbLock); + int nrUdpLists = arrayList_size(handle->udpPartLists); + int i; + for(i=0; i < nrUdpLists; i++) { + udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i); + if(udpPartList) { + if(udpPartList->data) { + free(udpPartList->data); + udpPartList->data = NULL; + } + free(udpPartList); + } + } + arrayList_destroy(handle->udpPartLists); + handle->udpPartLists = NULL; + pthread_mutex_unlock(&handle->dbLock); + pthread_mutex_destroy(&handle->dbLock); + free(handle); + } +} + +// +// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP. +// +int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen) +{ + int n; + int result = 0; + msg_part_header_t header; + + int written = 0; + header.msg_ident = (unsigned int)random(); + header.total_msg_size = 0; + for(n = 0; n < len ;n++) { + header.total_msg_size += largeMsg_iovec[n].iov_len; + } + int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1; + + struct iovec msg_iovec[MAX_MSG_VECTOR_LEN]; + struct msghdr msg; + msg.msg_name = dest_addr; + msg.msg_namelen = addrlen; + msg.msg_flags = 0; + msg.msg_iov = msg_iovec; + msg.msg_iovlen = 2; // header and payload; + msg.msg_control = NULL; + msg.msg_controllen = 0; + + msg.msg_iov[0].iov_base = &header; + msg.msg_iov[0].iov_len = sizeof(header); + + for(n = 0; n < nr_buffers; n++) { + + header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE)); + header.offset = n * MAX_PART_SIZE; + int remainingOffset = header.offset; + int recvPart = 0; + // find the start of the part + while(remainingOffset > largeMsg_iovec[recvPart].iov_len) { + remainingOffset -= largeMsg_iovec[recvPart].iov_len; + recvPart++; + } + int remainingData = header.part_msg_size; + int sendPart = 1; + msg.msg_iovlen = 1; + + // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal. + while(remainingData > 0) { + int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData); + msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset; + msg.msg_iov[sendPart].iov_len = partLen; + remainingData -= partLen; + remainingOffset = 0; + sendPart++; + recvPart++; + msg.msg_iovlen++; + } + int tmp, tmptot; + for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) { + tmptot += msg.msg_iov[tmp].iov_len; + } + + int w = sendmsg(fd, &msg, 0); + if(w == -1) { + perror("send()"); + result = -1; + break; + } + written += w; + } + + return (result == 0 ? written : result); +} + +// +// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP. +// +int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen) +{ + int n; + int nr_buffers = (count / MAX_PART_SIZE) + 1; + int result = 0; + msg_part_header_t header; + + int written = 0; + header.msg_ident = (unsigned int)random(); + header.total_msg_size = count; + char *databuf = buf; + + struct iovec msg_iovec[2]; + struct msghdr msg; + msg.msg_name = dest_addr; + msg.msg_namelen = addrlen; + msg.msg_flags = 0; + msg.msg_iov = msg_iovec; + msg.msg_iovlen = 2; // header and payload; + msg.msg_control = NULL; + msg.msg_controllen = 0; + + msg.msg_iov[0].iov_base = &header; + msg.msg_iov[0].iov_len = sizeof(header); + + for(n = 0; n < nr_buffers; n++) { + + header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE)); + header.offset = n * MAX_PART_SIZE; + msg.msg_iov[1].iov_base = &databuf[header.offset]; + msg.msg_iov[1].iov_len = header.part_msg_size; + int w = sendmsg(fd, &msg, 0); + if(w == -1) { + perror("send()"); + result = -1; + break; + } + written += w; + //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost) + } + + return (result == 0 ? written : result); +} + +// +// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure +// If the message is completely reassembled true is returned and the index and size have valid values +// +bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) { + msg_part_header_t header; + int result = false; + // Only read the header, we don't know yet where to store the payload + if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) { + perror("read()"); + return false; + } + + struct iovec msg_vec[2]; + struct msghdr msg; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_flags = 0; + msg.msg_iov = msg_vec; + msg.msg_iovlen = 2; // header and payload; + msg.msg_control = NULL; + msg.msg_controllen = 0; + + msg.msg_iov[0].iov_base = &header; + msg.msg_iov[0].iov_len = sizeof(header); + + pthread_mutex_lock(&handle->dbLock); + + int nrUdpLists = arrayList_size(handle->udpPartLists); + int i; + bool found = false; + for(i = 0; i < nrUdpLists; i++) { + udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i); + if(udpPartList->msg_ident == header.msg_ident) { + found = true; + + //sanity check + if(udpPartList->msg_size != header.total_msg_size) { + // Corruption occurred. Remove the existing administration and build up a new one. + arrayList_remove(handle->udpPartLists, i); + free(udpPartList->data); + free(udpPartList); + found = false; + break; + } + + msg.msg_iov[1].iov_base = &udpPartList->data[header.offset]; + msg.msg_iov[1].iov_len = header.part_msg_size; + if(recvmsg(fd, &msg, 0)<0){ + found=true; + result=false; + break; + } + + udpPartList->nrPartsRemaining--; + if(udpPartList->nrPartsRemaining == 0) { + *index = i; + *size = udpPartList->msg_size; + result = true; + break; + } else { + result = false; // not complete + break; + } + } + } + + if(found == false) { + udpPartList_pt udpPartList = NULL; + if(nrUdpLists == handle->maxNrLists) { + // remove list at index 0 + udpPartList = arrayList_remove(handle->udpPartLists, 0); + fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining ); + free(udpPartList->data); + free(udpPartList); + nrUdpLists--; + } + udpPartList = calloc(sizeof(*udpPartList), 1); + udpPartList->msg_ident = header.msg_ident; + udpPartList->msg_size = header.total_msg_size; + udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE; + udpPartList->data = calloc(sizeof(char), header.total_msg_size); + + msg.msg_iov[1].iov_base = &udpPartList->data[header.offset]; + msg.msg_iov[1].iov_len = header.part_msg_size; + if(recvmsg(fd, &msg, 0)<0){ + free(udpPartList->data); + free(udpPartList); + result=false; + } + else{ + arrayList_add(handle->udpPartLists, udpPartList); + + if(udpPartList->nrPartsRemaining == 0) { + *index = nrUdpLists; + *size = udpPartList->msg_size; + result = true; + } else { + result = false; + } + } + + } + + pthread_mutex_unlock(&handle->dbLock); + + return result; +} + +// +// Read out the message which is indicated available by the largeUdp_dataAvailable function +// +int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size) +{ + int result = 0; + pthread_mutex_lock(&handle->dbLock); + + udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index); + if(udpPartList) { + *buffer = udpPartList->data; + free(udpPartList); + } else { + result = -1; + } + pthread_mutex_unlock(&handle->dbLock); + + return result; +} http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/large_udp.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/src/large_udp.h b/pubsub/pubsub_admin_udp_mc/src/large_udp.h new file mode 100644 index 0000000..a21e654 --- /dev/null +++ b/pubsub/pubsub_admin_udp_mc/src/large_udp.h @@ -0,0 +1,45 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * large_udp.h + * + * \date Mar 1, 2016 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef _LARGE_UDP_H_ +#define _LARGE_UDP_H_ +#include <stdbool.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> + +typedef struct largeUdp *largeUdp_pt; + +largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions); +void largeUdp_destroy(largeUdp_pt handle); + +int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen); +int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen); +bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size); +int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size); + +#endif /* _LARGE_UDP_H_ */
