http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c index 409c7a5..cf51ed9 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c @@ -49,51 +49,49 @@ #include "pubsub_utils.h" #ifdef BUILD_WITH_ZMQ_SECURITY - #include "zmq_crypto.h" +#include "zmq_crypto.h" - #define MAX_CERT_PATH_LENGTH 512 +#define MAX_CERT_PATH_LENGTH 512 #endif #define POLL_TIMEOUT 250 #define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" -struct topic_subscription { +struct topic_subscription{ + zsock_t* zmq_socket; zcert_t * zmq_cert; zcert_t * zmq_pub_cert; - pthread_mutex_t socket_lock; //Protects zmq_socket access + pthread_mutex_t socket_lock; service_tracker_pt tracker; array_list_pt sub_ep_list; celix_thread_t recv_thread; bool running; - celix_thread_mutex_t ts_lock; //Protects topic_subscription data structure access + celix_thread_mutex_t ts_lock; bundle_context_pt context; - hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t* - hash_map_pt bundleMap; //key = service ptr, value = bundle_pt - array_list_pt pendingConnections; - array_list_pt pendingDisconnections; + pubsub_serializer_service_t *serializer; + + hash_map_pt servicesMap; // key = service, value = msg types map celix_thread_mutex_t pendingConnections_lock; + array_list_pt pendingConnections; + + array_list_pt pendingDisconnections; celix_thread_mutex_t pendingDisconnections_lock; + unsigned int nrSubscribers; - pubsub_serializer_service_t* serializerSvc; }; -/* Note: correct locking order is - * 1. socket_lock - * 2. ts_lock - */ - -typedef struct complete_zmq_msg { +typedef struct complete_zmq_msg{ zframe_t* header; zframe_t* payload; }* complete_zmq_msg_pt; -typedef struct mp_handle { - pubsub_msg_serializer_map_t* map; +typedef struct mp_handle{ + hash_map_pt svc_msg_db; hash_map_pt rcv_msg_map; -} mp_handle_t; +}* mp_handle_pt; typedef struct msg_map_entry{ bool retain; @@ -107,100 +105,66 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); static void sigusr1_sighandler(int signo); static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); -static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list); -static void destroy_mp_handle(mp_handle_t* mp_handle); +static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list); +static void destroy_mp_handle(mp_handle_pt mp_handle); static void connectPendingPublishers(topic_subscription_pt sub); static void disconnectPendingPublishers(topic_subscription_pt sub); -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){ +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY - if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC) != 0){ - char* secure_topics = NULL; - bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics); - - if (secure_topics){ - array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics); - - int i; - int secure_topics_size = arrayList_size(secure_topics_list); - for (i = 0; i < secure_topics_size; i++){ - char* top = arrayList_get(secure_topics_list, i); - if (strcmp(topic, top) == 0){ - printf("TS: Secure topic: '%s'\n", top); - subEP->is_secure = true; - } - free(top); - top = NULL; - } - - arrayList_destroy(secure_topics_list); - } + char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); + if (keys_bundle_dir == NULL){ + return CELIX_SERVICE_EXCEPTION; } - zcert_t* sub_cert = NULL; - zcert_t* pub_cert = NULL; - const char* pub_key = NULL; - if (subEP->is_secure){ - char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); - if (keys_bundle_dir == NULL){ - return CELIX_SERVICE_EXCEPTION; - } - - const char* keys_file_path = NULL; - const char* keys_file_name = NULL; - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); + const char* keys_file_path = NULL; + const char* keys_file_name = NULL; + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); - char sub_cert_path[MAX_CERT_PATH_LENGTH]; - char pub_cert_path[MAX_CERT_PATH_LENGTH]; + char sub_cert_path[MAX_CERT_PATH_LENGTH]; + char pub_cert_path[MAX_CERT_PATH_LENGTH]; - //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" - snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); - snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); - free(keys_bundle_dir); + //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" + snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); + snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); + free(keys_bundle_dir); - printf("TS: Loading subscriber key '%s'\n", sub_cert_path); - printf("TS: Loading publisher key '%s'\n", pub_cert_path); - - sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); - if (sub_cert == NULL){ - printf("TS: Cannot load key '%s'\n", sub_cert_path); - printf("TS: Topic '%s' NOT SECURED !\n", topic); - subEP->is_secure = false; - } + printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", sub_cert_path); + printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path); - pub_cert = zcert_load(pub_cert_path); - if (sub_cert != NULL && pub_cert == NULL){ - zcert_destroy(&sub_cert); - printf("TS: Cannot load key '%s'\n", pub_cert_path); - printf("TS: Topic '%s' NOT SECURED !\n", topic); - subEP->is_secure = false; - } + zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); + if (sub_cert == NULL){ + printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path); + return CELIX_SERVICE_EXCEPTION; + } - pub_key = zcert_public_txt(pub_cert); + zcert_t* pub_cert = zcert_load(pub_cert_path); + if (pub_cert == NULL){ + zcert_destroy(&sub_cert); + printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path); + return CELIX_SERVICE_EXCEPTION; } + + const char* pub_key = zcert_public_txt(pub_cert); #endif zsock_t* zmq_s = zsock_new (ZMQ_SUB); if(zmq_s==NULL){ - #ifdef BUILD_WITH_ZMQ_SECURITY - if (subEP->is_secure){ - zcert_destroy(&sub_cert); - zcert_destroy(&pub_cert); - } - #endif +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_destroy(&sub_cert); + zcert_destroy(&pub_cert); +#endif return CELIX_SERVICE_EXCEPTION; } - #ifdef BUILD_WITH_ZMQ_SECURITY - if (subEP->is_secure){ - zcert_apply (sub_cert, zmq_s); - zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber - } - #endif +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_apply (sub_cert, zmq_s); + zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber +#endif if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){ zsock_set_subscribe (zmq_s, ""); @@ -214,20 +178,18 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, ts->zmq_socket = zmq_s; ts->running = false; ts->nrSubscribers = 0; - ts->serializerSvc = NULL; + ts->serializer = best_serializer; - #ifdef BUILD_WITH_ZMQ_SECURITY - if (subEP->is_secure){ - ts->zmq_cert = sub_cert; - ts->zmq_pub_cert = pub_cert; - } - #endif +#ifdef BUILD_WITH_ZMQ_SECURITY + ts->zmq_cert = sub_cert; + ts->zmq_pub_cert = pub_cert; +#endif celixThreadMutex_create(&ts->socket_lock, NULL); celixThreadMutex_create(&ts->ts_lock,NULL); arrayList_create(&ts->sub_ep_list); - ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL); - ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); + arrayList_create(&ts->pendingConnections); arrayList_create(&ts->pendingDisconnections); celixThreadMutex_create(&ts->pendingConnections_lock, NULL); @@ -236,17 +198,17 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char filter[128]; memset(filter,0,128); if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { - // default scope, means that subscriber has not defined a scope property - snprintf(filter, 128, "(&(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic); + // default scope, means that subscriber has not defined a scope property + snprintf(filter, 128, "(&(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic); } else { - snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic, - PUBSUB_SUBSCRIBER_SCOPE,scope); - } + snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic, + PUBSUB_SUBSCRIBER_SCOPE,scope); + } service_tracker_customizer_pt customizer = NULL; status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); @@ -259,10 +221,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, sigaction(SIGUSR1,&actions,NULL); - if (status == CELIX_SUCCESS) { - *out=ts; - pubsub_topicSubscriptionSetSerializer(ts, serializer); - } + *out=ts; return status; } @@ -276,8 +235,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ serviceTracker_destroy(ts->tracker); arrayList_clear(ts->sub_ep_list); arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->msgSerializerMapMap,false,false); - hashMap_destroy(ts->bundleMap,false,false); + /* TODO: Destroy all the serializer maps? */ + hashMap_destroy(ts->servicesMap,false,false); celixThreadMutex_lock(&ts->pendingConnections_lock); arrayList_destroy(ts->pendingConnections); @@ -289,18 +248,17 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ celixThreadMutex_unlock(&ts->pendingDisconnections_lock); celixThreadMutex_destroy(&ts->pendingDisconnections_lock); - #ifdef BUILD_WITH_ZMQ_SECURITY - zcert_destroy(&(ts->zmq_cert)); - zcert_destroy(&(ts->zmq_pub_cert)); - #endif - - celixThreadMutex_unlock(&ts->ts_lock); - celixThreadMutex_lock(&ts->socket_lock); zsock_destroy(&(ts->zmq_socket)); +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_destroy(&(ts->zmq_cert)); + zcert_destroy(&(ts->zmq_pub_cert)); +#endif celixThreadMutex_unlock(&ts->socket_lock); celixThreadMutex_destroy(&ts->socket_lock); + celixThreadMutex_unlock(&ts->ts_lock); + free(ts); @@ -310,8 +268,6 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ celix_status_t status = CELIX_SUCCESS; - //celixThreadMutex_lock(&ts->ts_lock); - status = serviceTracker_open(ts->tracker); ts->running = true; @@ -320,16 +276,12 @@ celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts); } - //celixThreadMutex_unlock(&ts->ts_lock); - return status; } celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ celix_status_t status = CELIX_SUCCESS; - //celixThreadMutex_lock(&ts->ts_lock); - ts->running = false; pthread_kill(ts->recv_thread.thread,SIGUSR1); @@ -338,15 +290,13 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ status = serviceTracker_close(ts->tracker); - //celixThreadMutex_unlock(&ts->ts_lock); - return status; } celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){ + if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){ status = CELIX_SERVICE_EXCEPTION; } celixThreadMutex_unlock(&ts->socket_lock); @@ -355,28 +305,28 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts } celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_add(ts->pendingConnections, url); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - return status; + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_add(ts->pendingConnections, url); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + return status; } celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_add(ts->pendingDisconnections, url); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - return status; + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_add(ts->pendingDisconnections, url); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + return status; } celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){ + if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){ status = CELIX_SERVICE_EXCEPTION; } celixThreadMutex_unlock(&ts->socket_lock); @@ -388,9 +338,7 @@ celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, p celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - arrayList_add(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -401,9 +349,7 @@ celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers++; - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -413,22 +359,17 @@ celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - arrayList_removeElement(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); return status; - } celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers--; - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -438,152 +379,114 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { return ts->nrSubscribers; } -celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - //clear old - if (ts->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); - pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); - - } - } - ts->serializerSvc = serializerSvc; - //init new - if (ts->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter); - bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc); - pubsub_msg_serializer_map_t* map = NULL; - ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); - hashMap_put(ts->msgSerializerMapMap, subsvc, map); - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){ + return sub->sub_ep_list; } -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* svc){ +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (ts->serializerSvc == svc) { //only act if svc removed is services used - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); - pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); - } - ts->serializerSvc = NULL; - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} + if (!hashMap_containsKey(ts->servicesMap, service)) { + bundle_pt bundle = NULL; + hash_map_pt msgTypes = NULL; -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc) { - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; + serviceReference_getBundle(reference, &bundle); - celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) { - bundle_pt bundle = NULL; - serviceReference_getBundle(reference, &bundle); - - if (ts->serializerSvc != NULL) { - pubsub_msg_serializer_map_t* map = NULL; - ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); - if (map != NULL) { - hashMap_put(ts->msgSerializerMapMap, svc, map); - hashMap_put(ts->bundleMap, svc, bundle); - } - } - } + if(ts->serializer != NULL && bundle!=NULL){ + ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes); + if(msgTypes != NULL){ + hashMap_put(ts->servicesMap, service, msgTypes); + printf("PSA_ZMQ_TS: New subscriber registered.\n"); + } + } + else{ + printf("PSA_ZMQ_TS: Cannot register new subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; + } + } celixThreadMutex_unlock(&ts->ts_lock); - printf("TS: New subscriber registered.\n"); - return status; + return status; } -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc) { +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) { - pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc); - if (ts->serializerSvc != NULL){ - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - hashMap_remove(ts->bundleMap, svc); - hashMap_remove(ts->msgSerializerMapMap, svc); - } - } + if (hashMap_containsKey(ts->servicesMap, service)) { + hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); + if(msgTypes!=NULL && ts->serializer!=NULL){ + ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes); + printf("PSA_ZMQ_TS: Subscriber unregistered.\n"); + } + else{ + printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; + } + } celixThreadMutex_unlock(&ts->ts_lock); - printf("TS: Subscriber unregistered.\n"); return status; } -static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) { +static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); - hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); + hash_map_pt msgTypes = hashMapEntry_getValue(entry); - pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t )first_msg_hdr->type); + pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type); if (msgSer == NULL) { - printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n", first_msg_hdr->type); - } else { + printf("PSA_ZMQ_TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type); + } + else{ void *msgInst = NULL; - bool validVersion = checkVersion(msgSer->msgVersion, first_msg_hdr); + bool validVersion = checkVersion(msgSer->msgVersion,first_msg_hdr); + if(validVersion){ - celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst); + + celix_status_t status = msgSer->deserialize(msgSer, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst); if (status == CELIX_SUCCESS) { bool release = true; - - mp_handle_t* mp_handle = create_mp_handle(sub, map, msg_list); + mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list); pubsub_multipart_callbacks_t mp_callbacks; mp_callbacks.handle = mp_handle; mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; mp_callbacks.getMultipart = pubsub_getMultipart; subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release); - if (release) { - msgSer->freeMsg(msgSer->handle, msgInst); + if(release){ + msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst); } - if (mp_handle!=NULL) { + if(mp_handle!=NULL){ destroy_mp_handle(mp_handle); } } else{ - printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName); + printf("PSA_ZMQ_TS: Cannot deserialize msgType %s.\n",msgSer->msgName); } - } else { + } + else{ int major=0,minor=0; - version_getMajor(msgSer->msgVersion, &major); - version_getMinor(msgSer->msgVersion, &minor); - printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", - msgSer->msgName, major, minor, first_msg_hdr->major, first_msg_hdr->minor); + version_getMajor(msgSer->msgVersion,&major); + version_getMinor(msgSer->msgVersion,&minor); + printf("PSA_ZMQ_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor); } + } } + hashMapIterator_destroy(iter); int i = 0; for(;i<arrayList_size(msg_list);i++){ @@ -598,21 +501,21 @@ static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) { } static void* zmq_recv_thread_func(void * arg) { - topic_subscription_pt sub = (topic_subscription_pt) arg; + topic_subscription_pt sub = (topic_subscription_pt) arg; - while (sub->running) { + while (sub->running) { - celixThreadMutex_lock(&sub->socket_lock); + celixThreadMutex_lock(&sub->socket_lock); - zframe_t* headerMsg = zframe_recv(sub->zmq_socket); - if (headerMsg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("TS: header_recv thread for topic got a signal and will exit.\n"); - } else { - perror("TS: header_recv thread"); - } - } else { + zframe_t* headerMsg = zframe_recv(sub->zmq_socket); + if (headerMsg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("PSA_ZMQ_TS: header_recv thread for topic got a signal and will exit.\n"); + } else { + perror("PSA_ZMQ_TS: header_recv thread"); + } + } else { pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg); @@ -622,9 +525,9 @@ static void* zmq_recv_thread_func(void * arg) { if (payloadMsg == NULL) { if (errno == EINTR) { //It means we got a signal and we have to exit... - printf("TS: payload_recv thread for topic got a signal and will exit.\n"); + printf("PSA_ZMQ_TS: payload_recv thread for topic got a signal and will exit.\n"); } else { - perror("TS: payload_recv"); + perror("PSA_ZMQ_TS: payload_recv"); } zframe_destroy(&headerMsg); } else { @@ -644,9 +547,9 @@ static void* zmq_recv_thread_func(void * arg) { if (h_msg == NULL) { if (errno == EINTR) { //It means we got a signal and we have to exit... - printf("TS: h_recv thread for topic got a signal and will exit.\n"); + printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n"); } else { - perror("TS: h_recv"); + perror("PSA_ZMQ_TS: h_recv"); } break; } @@ -655,9 +558,9 @@ static void* zmq_recv_thread_func(void * arg) { if (p_msg == NULL) { if (errno == EINTR) { //It means we got a signal and we have to exit... - printf("TS: p_recv thread for topic got a signal and will exit.\n"); + printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n"); } else { - perror("TS: p_recv"); + perror("PSA_ZMQ_TS: p_recv"); } zframe_destroy(&h_msg); break; @@ -682,16 +585,16 @@ static void* zmq_recv_thread_func(void * arg) { } //zframe_more(headerMsg) else { free(headerMsg); - printf("TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic); + printf("PSA_ZMQ_TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic); } - } // headerMsg != NULL - celixThreadMutex_unlock(&sub->socket_lock); - connectPendingPublishers(sub); - disconnectPendingPublishers(sub); - } // while + } // headerMsg != NULL + celixThreadMutex_unlock(&sub->socket_lock); + connectPendingPublishers(sub); + disconnectPendingPublishers(sub); + } // while - return NULL; + return NULL; } static void connectPendingPublishers(topic_subscription_pt sub) { @@ -715,7 +618,7 @@ static void disconnectPendingPublishers(topic_subscription_pt sub) { } static void sigusr1_sighandler(int signo){ - printf("TS: Topic subscription being shut down...\n"); + printf("PSA_ZMQ_TS: Topic subscription being shut down...\n"); return; } @@ -746,7 +649,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain return -1; } - mp_handle_t* mp_handle = handle; + mp_handle_pt mp_handle = (mp_handle_pt)handle; msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId); if(entry!=NULL){ entry->retain = retain; @@ -762,59 +665,66 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain } -static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) { +static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){ if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message return NULL; } - mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle)); - mp_handle->map = map; + mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle)); + mp_handle->svc_msg_db = svc_msg_db; mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL); - int i; //We skip the first message, it will be handle differently - for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) { - complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i); + int i=1; //We skip the first message, it will be handle differently + for(;i<arrayList_size(rcv_msg_list);i++){ + complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i); pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header); - pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t)(header->type)); - if (msgSer != NULL) { + pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, (void*)(uintptr_t)(header->type)); + + if (msgSer!= NULL) { void *msgInst = NULL; - bool validVersion = checkVersion(msgSer->msgVersion, header); + + bool validVersion = checkVersion(msgSer->msgVersion,header); + if(validVersion){ - //TODO make the getMultipart lazy? - celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 0, &msgInst); + celix_status_t status = msgSer->deserialize(msgSer->handle, (const void*)zframe_data(c_msg->payload), 0, &msgInst); if(status == CELIX_SUCCESS){ msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); entry->msgInst = msgInst; - hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)(header->type), entry); + hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)header->type,entry); } } } } + return mp_handle; + } -static void destroy_mp_handle(mp_handle_t* mp_handle){ +static void destroy_mp_handle(mp_handle_pt mp_handle){ hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map); while(hashMapIterator_hasNext(iter)){ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry); msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry); - pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId); - if (msgSer != NULL) { - if (!msgEntry->retain) { - msgSer->freeMsg(msgSer->handle, msgEntry->msgInst); + pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId); + + if(msgSer!=NULL){ + if(!msgEntry->retain){ + msgSer->freeMsg(msgSer->handle,msgEntry->msgInst); } } else{ - printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n", msgId); + printf("PSA_ZMQ_TS: ERROR: Cannot find messageSerializer for msg %u, so cannot destroy it!\n",msgId); } + + free(msgEntry); } hashMapIterator_destroy(iter); - hashMap_destroy(mp_handle->rcv_msg_map,true,true); + hashMap_destroy(mp_handle->rcv_msg_map,false,false); free(mp_handle); }
http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h index f7ab7e0..f24d825 100644 --- a/pubsub/pubsub_common/public/include/pubsub_admin.h +++ b/pubsub/pubsub_common/public/include/pubsub_admin.h @@ -31,13 +31,12 @@ #include "pubsub_common.h" #include "pubsub_endpoint.h" -#include "pubsub_serializer.h" #define PSA_IP "PSA_IP" #define PSA_ITF "PSA_INTERFACE" #define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" -#define PSA_DEFAULT "zmq" +#define PUBSUB_ADMIN_TYPE_KEY "pubsub_admin.type" typedef struct pubsub_admin *pubsub_admin_pt; @@ -53,12 +52,19 @@ struct pubsub_admin_service { celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); - celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); - celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); - - celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); - celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); - + /* Match principle: + * - A full matching pubsub_admin gives 200 points + * - A full matching serializer gives 100 points + * - If QoS = sample + * - fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75. + * - fallback serializers order of selection is: json, void. Points allocation is 30,20. + * - If QoS = control + * - fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75. + * - fallback serializers order of selection is: json, void. Points allocation is 30,20. + * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two. + * + */ + celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); }; typedef struct pubsub_admin_service *pubsub_admin_service_pt; http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_admin_match.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_admin_match.h b/pubsub/pubsub_common/public/include/pubsub_admin_match.h new file mode 100644 index 0000000..a366c34 --- /dev/null +++ b/pubsub/pubsub_common/public/include/pubsub_admin_match.h @@ -0,0 +1,27 @@ +/* + * pubsub_admin_match.h + * + * Created on: Sep 4, 2017 + * Author: dn234 + */ + +#ifndef PUBSUB_ADMIN_MATCH_H_ +#define PUBSUB_ADMIN_MATCH_H_ + +#include "celix_errno.h" +#include "properties.h" +#include "array_list.h" + +#include "pubsub_serializer.h" + +#define QOS_ATTRIBUTE_KEY "attribute.qos" +#define QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */ +#define QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */ + +#define PUBSUB_ADMIN_FULL_MATCH_SCORE 200.0F +#define SERIALIZER_FULL_MATCH_SCORE 100.0F + +celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score); +celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc); + +#endif /* PUBSUB_ADMIN_MATCH_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h b/pubsub/pubsub_common/public/include/pubsub_common.h index 46abd72..5dfd8fd 100644 --- a/pubsub/pubsub_common/public/include/pubsub_common.h +++ b/pubsub/pubsub_common/public/include/pubsub_common.h @@ -32,12 +32,12 @@ #define PUBSUB_DISCOVERY_SERVICE "pubsub_discovery" #define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE "pubsub_tm_announce_publisher" -#define PUBSUB_ANY_SUB_TOPIC "any" +#define PUBSUB_ANY_SUB_TOPIC "any" -#define PUBSUB_BUNDLE_ID "bundle.id" +#define PUBSUB_BUNDLE_ID "bundle.id" -#define MAX_SCOPE_LEN 1024 -#define MAX_TOPIC_LEN 1024 +#define MAX_SCOPE_LEN 1024 +#define MAX_TOPIC_LEN 1024 struct pubsub_msg_header{ char topic[MAX_TOPIC_LEN]; http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h b/pubsub/pubsub_common/public/include/pubsub_endpoint.h index 193b3fd..8a979eb 100644 --- a/pubsub/pubsub_common/public/include/pubsub_endpoint.h +++ b/pubsub/pubsub_common/public/include/pubsub_endpoint.h @@ -28,6 +28,11 @@ #define PUBSUB_ENDPOINT_H_ #include "service_reference.h" +#include "listener_hook_service.h" +#include "properties.h" + +#include "publisher.h" +#include "subscriber.h" struct pubsub_endpoint { char *frameworkUUID; @@ -36,12 +41,15 @@ struct pubsub_endpoint { long serviceID; char* endpoint; bool is_secure; + properties_pt topic_props; }; typedef struct pubsub_endpoint *pubsub_endpoint_pt; -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,pubsub_endpoint_pt* psEp); -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out); celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_serializer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h index e9f9f6c..4489fa4 100644 --- a/pubsub/pubsub_common/public/include/pubsub_serializer.h +++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h @@ -28,9 +28,12 @@ #define PUBSUB_SERIALIZER_SERVICE_H_ #include "service_reference.h" +#include "hash_map.h" #include "pubsub_common.h" +#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub_serializer.type" + /** * There should be a pubsub_serializer_t * per msg type (msg id) per bundle @@ -39,28 +42,24 @@ * a serializer_map per bundle. Potentially using * the extender pattern. */ + typedef struct pubsub_msg_serializer { - void* handle; - unsigned int msgId; - const char* msgName; - version_pt msgVersion; + void* handle; + unsigned int msgId; + const char* msgName; + version_pt msgVersion; - celix_status_t (*serialize)(void* handle, const void* input, char** out, size_t* outLen); - celix_status_t (*deserialize)(void* handle, const char* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed + celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen); + celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed + void (*freeMsg)(void* handle, void* msg); - void (*freeMsg)(void* handle, void* msg); } pubsub_msg_serializer_t; -typedef struct pubsub_msg_serializer_map { - bundle_pt bundle; - hash_map_pt serializers; //key = msg id (unsigned int), value = pubsub_serializer_t* -} pubsub_msg_serializer_map_t; - typedef struct pubsub_serializer_service { void* handle; - celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, pubsub_msg_serializer_map_t** out); - celix_status_t (*destroySerializerMap)(void* handle, pubsub_msg_serializer_map_t* map); + celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap); + celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap); } pubsub_serializer_service_t; http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/log_helper.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/src/log_helper.c b/pubsub/pubsub_common/public/src/log_helper.c index dbd1cc3..7a63363 100644 --- a/pubsub/pubsub_common/public/src/log_helper.c +++ b/pubsub/pubsub_common/public/src/log_helper.c @@ -149,6 +149,9 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) { return status; } + + + celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... ) { celix_status_t status = CELIX_SUCCESS; @@ -166,6 +169,7 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m int i = 0; for (; i < arrayList_size(loghelper->logServices); i++) { + log_service_pt logService = arrayList_get(loghelper->logServices, i); if (logService != NULL) { @@ -175,31 +179,31 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m } pthread_mutex_unlock(&loghelper->logListLock); + } - if (!logged && loghelper->stdOutFallback) { - char *levelStr = NULL; - - switch (level) { - case OSGI_LOGSERVICE_ERROR: - levelStr = "ERROR"; - break; - case OSGI_LOGSERVICE_WARNING: - levelStr = "WARNING"; - break; - case OSGI_LOGSERVICE_INFO: - levelStr = "INFO"; - break; - case OSGI_LOGSERVICE_DEBUG: - default: - levelStr = "DEBUG"; - break; - } - printf("%s: %s\n", levelStr, msg); - } - } + if (!logged && loghelper->stdOutFallback) { + char *levelStr = NULL; + + switch (level) { + case OSGI_LOGSERVICE_ERROR: + levelStr = "ERROR"; + break; + case OSGI_LOGSERVICE_WARNING: + levelStr = "WARNING"; + break; + case OSGI_LOGSERVICE_INFO: + levelStr = "INFO"; + break; + case OSGI_LOGSERVICE_DEBUG: + default: + levelStr = "DEBUG"; + break; + } + + printf("%s: %s\n", levelStr, msg); + } - va_end(listPointer); return status; } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/pubsub_admin_match.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/src/pubsub_admin_match.c b/pubsub/pubsub_common/public/src/pubsub_admin_match.c new file mode 100644 index 0000000..bb555b7 --- /dev/null +++ b/pubsub/pubsub_common/public/src/pubsub_admin_match.c @@ -0,0 +1,303 @@ +/* + * pubsub_admin_match.c + + * + * Created on: Sep 4, 2017 + * Author: dn234 + */ + +#include <string.h> +#include "service_reference.h" + +#include "pubsub_admin.h" + +#include "pubsub_admin_match.h" + +#define KNOWN_PUBSUB_ADMIN_NUM 2 +#define KNOWN_SERIALIZER_NUM 2 + +static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"}; +static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; + +static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"}; +static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; + +static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F}; +static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F}; + +static void get_serializer_type(service_reference_pt svcRef, char **serializerType); +static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService); + +celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){ + + celix_status_t status = CELIX_SUCCESS; + double final_score = 0; + int i = 0, j = 0; + + const char *requested_admin_type = NULL; + const char *requested_serializer_type = NULL; + const char *requested_qos_type = NULL; + + if(endpoint_props!=NULL){ + requested_admin_type = properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY); + requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); + requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); + } + + /* Analyze the pubsub_admin */ + if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */ + if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match + final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE; + } + } + else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */ + if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ + for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ + if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ + final_score += qos_pubsub_admin_score[i]; + break; + } + } + } + else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ + for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ + if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ + final_score += qos_pubsub_admin_score[i]; + break; + } + } + } + else{ + printf("Unknown QoS type '%s'\n",requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; + } + } + else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ + for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ + if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ + final_score += (qos_pubsub_admin_score[i]/2); + break; + } + } + } + + char *serializer_type = NULL; + /* Analyze the serializers */ + if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ + for(i=0;i<arrayList_size(serializerList);i++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ + final_score += SERIALIZER_FULL_MATCH_SCORE; + break; + } + } + } + } + else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ + if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + ser_found = true; + } + } + } + if(ser_found){ + final_score += qos_serializer_score[i]; + } + } + } + else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + ser_found = true; + } + } + } + if(ser_found){ + final_score += qos_serializer_score[i]; + } + } + } + else{ + printf("Unknown QoS type '%s'\n",requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; + } + } + else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + ser_found = true; + } + } + } + if(ser_found){ + final_score += (qos_serializer_score[i]/2); + } + } + } + + *score = final_score; + + printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score); + + return status; +} + +celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){ + celix_status_t status = CELIX_SUCCESS; + + int i = 0, j = 0; + + const char *requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); + const char *requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); + + service_reference_pt svcRef = NULL; + void *svc = NULL; + + /* Analyze the serializers */ + if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ + for(i=0;i<arrayList_size(serializerList);i++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,i); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + *serSvc = svc; + break; + } + } + } + } + else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ + if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,j); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + else{ + *serSvc = svc; + ser_found = true; + printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE); + } + } + } + } + } + } + else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,j); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + else{ + *serSvc = svc; + ser_found = true; + printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL); + } + } + } + } + } + } + else{ + printf("Unknown QoS type '%s'\n",requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; + } + } + else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,j); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + else{ + *serSvc = svc; + ser_found = true; + printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]); + } + } + } + } + } + } + + if(svc!=NULL && svcRef!=NULL){ + manage_service_from_reference(svcRef, svc, false); + } + + return status; +} + +static void get_serializer_type(service_reference_pt svcRef, char **serializerType){ + + const char *serType = NULL; + serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType != NULL){ + *serializerType = (char*)serType; + } + else{ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef); + *serializerType = NULL; + } +} + +static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){ + bundle_context_pt context = NULL; + bundle_pt bundle = NULL; + serviceReference_getBundle(svcRef, &bundle); + bundle_getContext(bundle, &context); + if(getService){ + bundleContext_getService(context, svcRef, svc); + } + else{ + bundleContext_ungetService(context, svcRef, NULL); + } +} http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/pubsub/pubsub_common/public/src/pubsub_endpoint.c index ebb330e..f6776d5 100644 --- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c +++ b/pubsub/pubsub_common/public/src/pubsub_endpoint.c @@ -33,41 +33,101 @@ #include "pubsub_common.h" #include "pubsub_endpoint.h" #include "constants.h" -#include "subscriber.h" -celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* out) { - celix_status_t status = CELIX_SUCCESS; +#include "pubsub_utils.h" - pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp)); - if (fwUUID != NULL) { - psEp->frameworkUUID = strdup(fwUUID); - } +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps); +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher); - if (scope != NULL) { - psEp->scope = strdup(scope); - } +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){ - if (topic != NULL) { - psEp->topic = strdup(topic); - } + if (fwUUID != NULL) { + psEp->frameworkUUID = strdup(fwUUID); + } + + if (scope != NULL) { + psEp->scope = strdup(scope); + } + + if (topic != NULL) { + psEp->topic = strdup(topic); + } + + psEp->serviceID = serviceId; + + if(endpoint != NULL) { + psEp->endpoint = strdup(endpoint); + } + + if(topic_props != NULL){ + if(cloneProps){ + properties_copy(topic_props, &(psEp->topic_props)); + } + else{ + psEp->topic_props = topic_props; + } + } +} + +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){ + + properties_pt topic_props = NULL; + + bool isSystemBundle = false; + bundle_isSystemBundle(bundle, &isSystemBundle); + long bundleId = -1; + bundle_isSystemBundle(bundle, &isSystemBundle); + bundle_getBundleId(bundle,&bundleId); + + if(isSystemBundle == false) { - psEp->serviceID = serviceId; + char *bundleRoot = NULL; + char* topicPropertiesPath = NULL; + bundle_getEntry(bundle, ".", &bundleRoot); - if (endpoint != NULL) { - psEp->endpoint = strdup(endpoint); - } + if(bundleRoot != NULL){ - *out = psEp; + asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic); + topic_props = properties_load(topicPropertiesPath); + if(topic_props==NULL){ + printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId); + } - return status; + free(topicPropertiesPath); + free(bundleRoot); + } + } + + return topic_props; +} + +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){ + celix_status_t status = CELIX_SUCCESS; + + *psEp = calloc(1, sizeof(**psEp)); + + pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true); + + return status; } -celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* out){ +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){ celix_status_t status = CELIX_SUCCESS; - pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp)); + *out = calloc(1,sizeof(**out)); + + pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true); + + return status; + +} + +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){ + celix_status_t status = CELIX_SUCCESS; + + *psEp = calloc(1,sizeof(**psEp)); bundle_pt bundle = NULL; bundle_context_pt ctxt = NULL; @@ -85,49 +145,86 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re const char* serviceId = NULL; serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId); - if(fwUUID!=NULL){ - psEp->frameworkUUID = strdup(fwUUID); - } + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ + properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); - if(scope!=NULL){ - psEp->scope = strdup(scope); - } else { - psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT); - } + pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false); - if(topic!=NULL){ - psEp->topic = strdup(topic); + if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope || !(*psEp)->topic) { + fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); + status = CELIX_BUNDLE_EXCEPTION; } - if(serviceId!=NULL){ - psEp->serviceID = strtol(serviceId,NULL,10); - } + return status; - if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || !psEp->topic) { - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); - status = CELIX_BUNDLE_EXCEPTION; +} + +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){ + celix_status_t status = CELIX_SUCCESS; + + const char* fwUUID=NULL; + bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + + char* topic = pubsub_getTopicFromFilter(info->filter); + if(topic==NULL || fwUUID==NULL){ + return CELIX_BUNDLE_EXCEPTION; } - if (status != CELIX_SUCCESS) { - pubsubEndpoint_destroy(psEp); - } else { - *out = psEp; + *psEp = calloc(1, sizeof(**psEp)); + + char* scope = pubsub_getScopeFromFilter(info->filter); + if(scope == NULL) { + scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); } - return status; + bundle_pt bundle = NULL; + long bundleId = -1; + bundleContext_getBundle(info->context,&bundle); + bundle_getBundleId(bundle,&bundleId); + + properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); + + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ + pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false); + + free(topic); + free(scope); + + + return status; } celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ - if (psEp != NULL) { + + if(psEp->frameworkUUID!=NULL){ free(psEp->frameworkUUID); + psEp->frameworkUUID = NULL; + } + + if(psEp->scope!=NULL){ free(psEp->scope); + psEp->scope = NULL; + } + + if(psEp->topic!=NULL){ free(psEp->topic); + psEp->topic = NULL; + } + + if(psEp->endpoint!=NULL){ free(psEp->endpoint); + psEp->endpoint = NULL; } + + if(psEp->topic_props != NULL){ + properties_destroy(psEp->topic_props); + } + free(psEp); return CELIX_SUCCESS; + } bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ @@ -138,7 +235,6 @@ bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ (psEp1->serviceID == psEp2->serviceID) /*&& ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/ ); - } char *createScopeTopicKey(const char* scope, const char* topic) { http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h index d5be8d6..676a6ab 100644 --- a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h +++ b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h @@ -58,7 +58,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery); celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP); celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP); -celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service); celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service); celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service); celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service); http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_common.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/etcd_common.c b/pubsub/pubsub_discovery/private/src/etcd_common.c index a53a844..c757801 100644 --- a/pubsub/pubsub_discovery/private/src/etcd_common.c +++ b/pubsub/pubsub_discovery/private/src/etcd_common.c @@ -32,6 +32,7 @@ #include "pubsub_discovery.h" #include "pubsub_discovery_impl.h" + #define MAX_ROOTNODE_LENGTH 128 #define MAX_LOCALNODE_LENGTH 4096 #define MAX_FIELD_LENGTH 128 http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/pubsub/pubsub_discovery/private/src/etcd_watcher.c index 13ba3aa..3c3a5a8 100644 --- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c +++ b/pubsub/pubsub_discovery/private/src/etcd_watcher.c @@ -57,17 +57,17 @@ struct etcd_watcher { celix_thread_mutex_t watcherLock; celix_thread_t watcherThread; - char *scope; + char *scope; char *topic; volatile bool running; }; struct etcd_writer { - pubsub_discovery_pt pubsub_discovery; - celix_thread_mutex_t localPubsLock; - array_list_pt localPubs; - volatile bool running; - celix_thread_t writerThread; + pubsub_discovery_pt pubsub_discovery; + celix_thread_mutex_t localPubsLock; + array_list_pt localPubs; + volatile bool running; + celix_thread_t writerThread; }; @@ -77,41 +77,41 @@ static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, co const char* rootPath = NULL; if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { - snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic); + snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic); } else { - snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic); + snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic); } return status; } static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) { - celix_status_t status = CELIX_SUCCESS; - const char* rootPath = NULL; + celix_status_t status = CELIX_SUCCESS; + const char* rootPath = NULL; - if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { - strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH); - } else { - strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH); - } + if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { + strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH); + } else { + strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH); + } - return status; + return status; } static void add_node(const char *key, const char *value, void* arg) { - pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg; - pubsub_endpoint_pt pubEP = NULL; - celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP); - if(!status && pubEP) { - pubsub_discovery_addNode(ps_discovery, pubEP); - } + pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg; + pubsub_endpoint_pt pubEP = NULL; + celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP); + if(!status && pubEP) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } } static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) { celix_status_t status = CELIX_SUCCESS; if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) { - status = CELIX_ILLEGAL_ARGUMENT; + status = CELIX_ILLEGAL_ARGUMENT; } return status; } @@ -137,14 +137,14 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath); if(expr) { - int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId); - free(expr); - if (foundItems != 4) { // Could happen when a directory is removed, just don't process this. - status = CELIX_ILLEGAL_STATE; - } - else{ - status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,pubEP); - } + int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId); + free(expr); + if (foundItems != 4) { // Could happen when a directory is removed, just don't process this. + status = CELIX_ILLEGAL_STATE; + } + else{ + status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP); + } } return status; } @@ -154,75 +154,75 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu * changing discovery endpoint information within etcd. */ static void* etcdWatcher_run(void* data) { - etcd_watcher_pt watcher = (etcd_watcher_pt) data; - time_t timeBeforeWatch = time(NULL); - char rootPath[MAX_ROOTNODE_LENGTH]; - long long highestModified = 0; - - pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery; - bundle_context_pt context = ps_discovery->context; - - memset(rootPath, 0, MAX_ROOTNODE_LENGTH); - - //TODO: add topic to etcd key - etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); - etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified); - - while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) { - - char *rkey = NULL; - char *value = NULL; - char *preValue = NULL; - char *action = NULL; - long long modIndex; - - celixThreadMutex_unlock(&watcher->watcherLock); - - if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) { - pubsub_endpoint_pt pubEP = NULL; - if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_addNode(ps_discovery, pubEP); - } - } else if (strcmp(action, "delete") == 0) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_removeNode(ps_discovery, pubEP); - } - } else if (strcmp(action, "expire") == 0) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_removeNode(ps_discovery, pubEP); - } - } else if (strcmp(action, "update") == 0) { - if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { - pubsub_discovery_addNode(ps_discovery, pubEP); - } - } else { - fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action); - } - highestModified = modIndex; - } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) { - sleep(DEFAULT_ETCD_TTL / 4); - } - - FREE_MEM(action); - FREE_MEM(value); - FREE_MEM(preValue); - FREE_MEM(rkey); - - /* prevent busy waiting, in case etcd_watch returns false */ - - - if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) { - timeBeforeWatch = time(NULL); - } - - } - - if (watcher->running == false) { - celixThreadMutex_unlock(&watcher->watcherLock); - } - - return NULL; + etcd_watcher_pt watcher = (etcd_watcher_pt) data; + time_t timeBeforeWatch = time(NULL); + char rootPath[MAX_ROOTNODE_LENGTH]; + long long highestModified = 0; + + pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery; + bundle_context_pt context = ps_discovery->context; + + memset(rootPath, 0, MAX_ROOTNODE_LENGTH); + + //TODO: add topic to etcd key + etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); + etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified); + + while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) { + + char *rkey = NULL; + char *value = NULL; + char *preValue = NULL; + char *action = NULL; + long long modIndex; + + celixThreadMutex_unlock(&watcher->watcherLock); + + if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) { + pubsub_endpoint_pt pubEP = NULL; + if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "delete") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_removeNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "expire") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_removeNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "update") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } + } else { + fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action); + } + highestModified = modIndex; + } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) { + sleep(DEFAULT_ETCD_TTL / 4); + } + + FREE_MEM(action); + FREE_MEM(value); + FREE_MEM(preValue); + FREE_MEM(rkey); + + /* prevent busy waiting, in case etcd_watch returns false */ + + + if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) { + timeBeforeWatch = time(NULL); + } + + } + + if (watcher->running == false) { + celixThreadMutex_unlock(&watcher->watcherLock); + } + + return NULL; } celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) { @@ -243,16 +243,18 @@ celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_c (*watcher)->scope = strdup(scope); (*watcher)->topic = strdup(topic); - celixThreadMutex_create(&(*watcher)->watcherLock, NULL); - celixThreadMutex_lock(&(*watcher)->watcherLock); + celixThreadMutex_create(&(*watcher)->watcherLock, NULL); + + celixThreadMutex_lock(&(*watcher)->watcherLock); + + status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher); + if (status == CELIX_SUCCESS) { + (*watcher)->running = true; + } - status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher); - if (status == CELIX_SUCCESS) { - (*watcher)->running = true; - } + celixThreadMutex_unlock(&(*watcher)->watcherLock); - celixThreadMutex_unlock(&(*watcher)->watcherLock); return status; } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_writer.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c b/pubsub/pubsub_discovery/private/src/etcd_writer.c index 687d802..1c423f3 100644 --- a/pubsub/pubsub_discovery/private/src/etcd_writer.c +++ b/pubsub/pubsub_discovery/private/src/etcd_writer.c @@ -47,11 +47,11 @@ #define DEFAULT_ETCD_TTL 30 struct etcd_writer { - pubsub_discovery_pt pubsub_discovery; - celix_thread_mutex_t localPubsLock; - array_list_pt localPubs; - volatile bool running; - celix_thread_t writerThread; + pubsub_discovery_pt pubsub_discovery; + celix_thread_mutex_t localPubsLock; + array_list_pt localPubs; + volatile bool running; + celix_thread_t writerThread; }; @@ -60,38 +60,38 @@ static void* etcdWriter_run(void* data); etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) { - etcd_writer_pt writer = calloc(1, sizeof(*writer)); - if(writer) { - celixThreadMutex_create(&writer->localPubsLock, NULL); - arrayList_create(&writer->localPubs); - writer->pubsub_discovery = disc; - writer->running = true; - celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer); - } - return writer; + etcd_writer_pt writer = calloc(1, sizeof(*writer)); + if(writer) { + celixThreadMutex_create(&writer->localPubsLock, NULL); + arrayList_create(&writer->localPubs); + writer->pubsub_discovery = disc; + writer->running = true; + celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer); + } + return writer; } void etcdWriter_destroy(etcd_writer_pt writer) { - char dir[MAX_ROOTNODE_LENGTH]; - const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - - writer->running = false; - celixThread_join(writer->writerThread, NULL); - - celixThreadMutex_lock(&writer->localPubsLock); - for(int i = 0; i < arrayList_size(writer->localPubs); i++) { - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i); - memset(dir,0,MAX_ROOTNODE_LENGTH); - snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID); - etcd_del(dir); - pubsubEndpoint_destroy(pubEP); - } - arrayList_destroy(writer->localPubs); - - celixThreadMutex_unlock(&writer->localPubsLock); - celixThreadMutex_destroy(&(writer->localPubsLock)); - - free(writer); + char dir[MAX_ROOTNODE_LENGTH]; + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + + writer->running = false; + celixThread_join(writer->writerThread, NULL); + + celixThreadMutex_lock(&writer->localPubsLock); + for(int i = 0; i < arrayList_size(writer->localPubs); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i); + memset(dir,0,MAX_ROOTNODE_LENGTH); + snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID); + etcd_del(dir); + pubsubEndpoint_destroy(pubEP); + } + arrayList_destroy(writer->localPubs); + + celixThreadMutex_unlock(&writer->localPubsLock); + celixThreadMutex_destroy(&(writer->localPubsLock)); + + free(writer); } celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){ @@ -101,11 +101,11 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end const char *fwUUID = NULL; bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) { - celixThreadMutex_lock(&writer->localPubsLock); - pubsub_endpoint_pt p = NULL; - pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p); - arrayList_add(writer->localPubs,p); - celixThreadMutex_unlock(&writer->localPubsLock); + celixThreadMutex_lock(&writer->localPubsLock); + pubsub_endpoint_pt p = NULL; + pubsubEndpoint_clone(pubEP, &p); + arrayList_add(writer->localPubs,p); + celixThreadMutex_unlock(&writer->localPubsLock); } } @@ -138,52 +138,52 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end } celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - char *key = NULL; - - const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); - - asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID); - - celixThreadMutex_lock(&writer->localPubsLock); - for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) { - pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i); - if (pubsubEndpoint_equals(ep, pubEP)) { - arrayList_remove(writer->localPubs, i); - pubsubEndpoint_destroy(ep); - break; - } - } - celixThreadMutex_unlock(&writer->localPubsLock); - - if (etcd_del(key)) { - printf("Failed to remove key %s from ETCD\n",key); - status = CELIX_ILLEGAL_ARGUMENT; - } - FREE_MEM(key); - return status; + celix_status_t status = CELIX_SUCCESS; + char *key = NULL; + + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + + asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID); + + celixThreadMutex_lock(&writer->localPubsLock); + for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) { + pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i); + if (pubsubEndpoint_equals(ep, pubEP)) { + arrayList_remove(writer->localPubs, i); + pubsubEndpoint_destroy(ep); + break; + } + } + celixThreadMutex_unlock(&writer->localPubsLock); + + if (etcd_del(key)) { + printf("Failed to remove key %s from ETCD\n",key); + status = CELIX_ILLEGAL_ARGUMENT; + } + FREE_MEM(key); + return status; } static void* etcdWriter_run(void* data) { - etcd_writer_pt writer = (etcd_writer_pt)data; - while(writer->running) { - celixThreadMutex_lock(&writer->localPubsLock); - for(int i=0; i < arrayList_size(writer->localPubs); i++) { - etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false); - } - celixThreadMutex_unlock(&writer->localPubsLock); - sleep(DEFAULT_ETCD_TTL / 2); - } - - return NULL; + etcd_writer_pt writer = (etcd_writer_pt)data; + while(writer->running) { + celixThreadMutex_lock(&writer->localPubsLock); + for(int i=0; i < arrayList_size(writer->localPubs); i++) { + etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false); + } + celixThreadMutex_unlock(&writer->localPubsLock); + sleep(DEFAULT_ETCD_TTL / 2); + } + + return NULL; } static const char* etcdWriter_getRootPath(bundle_context_pt context) { - const char* rootPath = NULL; - bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath); - if(rootPath == NULL) { - rootPath = DEFAULT_ETCD_ROOTPATH; - } - return rootPath; + const char* rootPath = NULL; + bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath); + if(rootPath == NULL) { + rootPath = DEFAULT_ETCD_ROOTPATH; + } + return rootPath; } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/psd_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/psd_activator.c b/pubsub/pubsub_discovery/private/src/psd_activator.c index afbe282..89a517d 100644 --- a/pubsub/pubsub_discovery/private/src/psd_activator.c +++ b/pubsub/pubsub_discovery/private/src/psd_activator.c @@ -48,7 +48,7 @@ static celix_status_t createTMPublisherAnnounceTracker(struct activator *activat service_tracker_customizer_pt customizer = NULL; status = serviceTrackerCustomizer_create(activator->pubsub_discovery, - pubsub_discovery_tmPublisherAnnounceAdding, + NULL, pubsub_discovery_tmPublisherAnnounceAdded, pubsub_discovery_tmPublisherAnnounceModified, pubsub_discovery_tmPublisherAnnounceRemoved, http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c index 0c7d6c4..94a8e11 100644 --- a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c +++ b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c @@ -167,7 +167,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { hashMapIterator_destroy(iter); hashMap_destroy(ps_discovery->watchers, true, true); celixThreadMutex_unlock(&ps_discovery->watchersMutex); - return status; } @@ -293,7 +292,7 @@ celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_ } free(pub_key); pubsub_endpoint_pt p = NULL; - pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p); + pubsubEndpoint_clone(pubEP, &p); arrayList_add(pubEP_list,p); @@ -397,16 +396,6 @@ celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* sc /* pubsub_topology_manager tracker callbacks */ -celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle; - - status = bundleContext_getService(pubsub_discovery->context, reference, service); - - return status; -} - celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h index 5299808..c36f20e 100644 --- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h +++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h @@ -24,8 +24,8 @@ * \copyright Apache License, Version 2.0 */ -#ifndef PUBSUB_SERIALIZER_IMPL_H_ -#define PUBSUB_SERIALIZER_IMPL_H_ +#ifndef PUBSUB_SERIALIZER_JSON_H_ +#define PUBSUB_SERIALIZER_JSON_H_ #include "dyn_common.h" #include "dyn_type.h" @@ -34,25 +34,22 @@ #include "pubsub_serializer.h" +#define PUBSUB_SERIALIZER_TYPE "json" + typedef struct pubsub_serializer { bundle_context_pt bundle_context; log_helper_pt loghelper; } pubsub_serializer_t; -typedef struct pubsub_msg_serializer_impl { - pubsub_msg_serializer_t msgSerializer; - dyn_message_type* dynMsg; -} pubsub_msg_serializer_impl_t; - celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer); celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer); -celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out); -celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, pubsub_msg_serializer_map_t* map); +celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap); +celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, hash_map_pt serializerMap); /* Start of serializer specific functions */ -celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen); -celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out); -void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg); +celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen); +celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out); +void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg); -#endif /* PUBSUB_SERIALIZER_IMPL_H_ */ +#endif /* PUBSUB_SERIALIZER_JSON_H_ */
