CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.
- The serializer is refactored to miminize to needed callback directly to the services. The serializer is now a two step approach. The serializer service can be used to create a map os msg serializers. And the msg serializer are serializer structs (with function ptrs) to serialize a specific msg - Where feasible replace _pt types with _t types. Removing the pointer in the typedef makes it possible to add const info and sizeof calls of the typedef Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7efe4331 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7efe4331 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7efe4331 Branch: refs/heads/develop Commit: 7efe4331aafdfea1df8bd181c136f20acaf52b49 Parents: 97df926 Author: Pepijn Noltes <[email protected]> Authored: Tue Apr 11 21:19:10 2017 +0200 Committer: Pepijn Noltes <[email protected]> Committed: Tue Apr 11 21:19:10 2017 +0200 ---------------------------------------------------------------------- cmake/cmake_celix/BundlePackaging.cmake | 2 +- dfi/private/src/json_serializer.c | 8 +- dfi/public/include/json_serializer.h | 4 +- pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 1 - .../private/include/pubsub_admin_impl.h | 6 +- .../include/pubsub_publish_service_private.h | 8 +- .../private/include/topic_subscription.h | 6 +- .../private/src/pubsub_admin_impl.c | 4 +- .../private/src/topic_publication.c | 173 +++++++------- .../private/src/topic_subscription.c | 158 +++++++------ pubsub/pubsub_admin_zmq/CMakeLists.txt | 1 - .../private/include/pubsub_admin_impl.h | 6 +- .../include/pubsub_publish_service_private.h | 6 +- .../private/include/topic_subscription.h | 6 +- .../private/src/pubsub_admin_impl.c | 4 +- .../private/src/topic_publication.c | 137 ++++++----- .../private/src/topic_subscription.c | 184 ++++++--------- .../public/include/dyn_msg_utils.h | 39 ---- .../pubsub_common/public/include/pubsub_admin.h | 4 +- .../public/include/pubsub_serializer.h | 47 ++-- pubsub/pubsub_common/public/src/dyn_msg_utils.c | 160 ------------- pubsub/pubsub_serializer_json/CMakeLists.txt | 1 - .../private/include/pubsub_serializer_impl.h | 35 ++- .../private/src/ps_activator.c | 20 +- .../private/src/pubsub_serializer_impl.c | 234 +++++++++++++++---- .../private/src/pubsub_topology_manager.c | 6 +- pubsub/test/msg_descriptors/msg.descriptor | 2 +- pubsub/test/test/tst_activator.cpp | 9 +- 28 files changed, 582 insertions(+), 689 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/cmake/cmake_celix/BundlePackaging.cmake ---------------------------------------------------------------------- diff --git a/cmake/cmake_celix/BundlePackaging.cmake b/cmake/cmake_celix/BundlePackaging.cmake index ded1ca5..7eb42fa 100644 --- a/cmake/cmake_celix/BundlePackaging.cmake +++ b/cmake/cmake_celix/BundlePackaging.cmake @@ -319,7 +319,7 @@ function(bundle_libs) if (ADD_TO_MANIFEST) list(APPEND LIBS "$<TARGET_SONAME_FILE_NAME:${LIB}>") endif() - list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>. + list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>. endif() get_target_property(IS_LIB ${BUNDLE} "BUNDLE_TARGET_IS_LIB") http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/private/src/json_serializer.c ---------------------------------------------------------------------- diff --git a/dfi/private/src/json_serializer.c b/dfi/private/src/json_serializer.c index 0c06998..c1cd339 100644 --- a/dfi/private/src/json_serializer.c +++ b/dfi/private/src/json_serializer.c @@ -280,7 +280,7 @@ static int jsonSerializer_parseSequence(dyn_type *seq, json_t *array, void *seqL return status; } -int jsonSerializer_serialize(dyn_type *type, void *input, char **output) { +int jsonSerializer_serialize(dyn_type *type, const void* input, char **output) { int status = OK; json_t *root = NULL; @@ -294,11 +294,11 @@ int jsonSerializer_serialize(dyn_type *type, void *input, char **output) { return status; } -int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out) { - return jsonSerializer_writeAny(type, input, out); +int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out) { + return jsonSerializer_writeAny(type, (void*)input /*TODO update static function to take const void**/, out); } -static int jsonSerializer_writeAny(dyn_type *type, void *input, json_t **out) { +static int jsonSerializer_writeAny(dyn_type *type, void* input, json_t **out) { int status = OK; int descriptor = dynType_descriptorType(type); http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/public/include/json_serializer.h ---------------------------------------------------------------------- diff --git a/dfi/public/include/json_serializer.h b/dfi/public/include/json_serializer.h index c785b01..2f91f2b 100644 --- a/dfi/public/include/json_serializer.h +++ b/dfi/public/include/json_serializer.h @@ -31,7 +31,7 @@ DFI_SETUP_LOG_HEADER(jsonSerializer); int jsonSerializer_deserialize(dyn_type *type, const char *input, void **result); int jsonSerializer_deserializeJson(dyn_type *type, json_t *input, void **result); -int jsonSerializer_serialize(dyn_type *type, void *input, char **output); -int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out); +int jsonSerializer_serialize(dyn_type *type, const void* input, char **output); +int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out); #endif http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt index ce32db0..1ac0c2d 100644 --- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt +++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt @@ -35,7 +35,6 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc private/src/topic_subscription.c private/src/topic_publication.c private/src/large_udp.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c ) http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h index 9eddf15..89e6547 100644 --- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h +++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h @@ -33,7 +33,7 @@ struct pubsub_admin { - pubsub_serializer_service_pt serializerSvc; + pubsub_serializer_service_t* serializerSvc; bundle_context_pt bundle_context; log_helper_pt loghelper; @@ -73,7 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc); -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc); +celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); #endif /* PUBSUB_ADMIN_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h index 81690b8..57d942a 100644 --- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h +++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h @@ -39,17 +39,17 @@ typedef struct pubsub_udp_msg { struct pubsub_msg_header header; unsigned int payloadSize; char payload[]; -} *pubsub_udp_msg_pt; +} pubsub_udp_msg_t; typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out); celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc); -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc); +celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h index 36c902e..1535ae5 100644 --- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h +++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h @@ -38,7 +38,7 @@ typedef struct topic_subscription* topic_subscription_pt; -celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out); +celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out); celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); @@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc); -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc); +celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c index 6e9f4e8..ebfe3e6 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c @@ -645,7 +645,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin return status; } -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; admin->serializerSvc = serializerSvc; @@ -673,7 +673,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize return status; } -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; admin->serializerSvc = NULL; http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c index aa3faf0..be0a433 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c @@ -37,7 +37,6 @@ #include "array_list.h" #include "celixbool.h" #include "service_registration.h" -#include "dyn_msg_utils.h" #include "utils.h" #include "service_factory.h" #include "version.h" @@ -59,7 +58,7 @@ struct topic_publication { hash_map_pt boundServices; //<bundle_pt,bound_service> celix_thread_mutex_t tp_lock; struct sockaddr_in destAddr; - pubsub_serializer_service_pt serializerSvc; + pubsub_serializer_service_t* serializerSvc; }; typedef struct publish_bundle_bound_service { @@ -68,27 +67,27 @@ typedef struct publish_bundle_bound_service { bundle_pt bundle; char *scope; char *topic; - hash_map_pt msgTypes; unsigned short getCount; celix_thread_mutex_t mp_lock; bool mp_send_in_progress; array_list_pt mp_parts; largeUdp_pt largeUdpHandle; -}* publish_bundle_bound_service_pt; + pubsub_msg_serializer_map_t* map; +} publish_bundle_bound_service_t; typedef struct pubsub_msg{ pubsub_msg_header_pt header; char* payload; int payloadSize; -}* pubsub_msg_pt; +} pubsub_msg_t; static unsigned int rand_range(unsigned int min, unsigned int max); static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); +static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc); static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg); @@ -98,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){ char* ep = malloc(EP_ADDRESS_LEN); memset(ep,0,EP_ADDRESS_LEN); @@ -136,7 +135,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); + publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter); pubsub_destroyPublishBundleBoundService(bound); } hashMapIterator_destroy(iter); @@ -223,41 +222,49 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&(pub->tp_lock)); - pub->serializerSvc = serializerSvc; + //clear old serializer + if (pub->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc + while (hashMapIterator_hasNext(&iter)) { + publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); + pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + bound->map = NULL; + } + } - hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(bs_iter)){ - publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter); - if (hashMap_size(boundSvc->msgTypes) == 0){ - pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle); - } + //setup new serializer + pub->serializerSvc = serializerSvc; + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + bundle_pt bundle = hashMapEntry_getKey(entry); + publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry); + pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map); } - hashMapIterator_destroy(bs_iter); celixThreadMutex_unlock(&(pub->tp_lock)); return status; } -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&(pub->tp_lock)); - - hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(bs_iter)){ - publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter); - pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes); - } - hashMapIterator_destroy(bs_iter); - + if (pub->serializerSvc == svc) { + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); + pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + bound->map = NULL; + } + } pub->serializerSvc = NULL; - celixThreadMutex_unlock(&(pub->tp_lock)); return status; @@ -275,18 +282,18 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound==NULL){ - bound = pubsub_createPublishBundleBoundService(publish,bundle); - if(bound!=NULL){ - hashMap_put(publish->boundServices,bundle,bound); + publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle); + if (bound == NULL) { + bound = pubsub_createPublishBundleBoundService(publish, bundle); + if (bound != NULL) { + hashMap_put(publish->boundServices, bundle, bound); } } - else{ + else { bound->getCount++; } - if(bound!=NULL){ + if (bound != NULL) { *service = bound->service; } @@ -301,17 +308,16 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound!=NULL){ + publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle); + if (bound != NULL) { bound->getCount--; - if(bound->getCount==0){ + if (bound->getCount == 0) { pubsub_destroyPublishBundleBoundService(bound); hashMap_remove(publish->boundServices,bundle); } - } - else{ + else { long bundleId = -1; bundle_getBundleId(bundle,&bundleId); printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId); @@ -325,12 +331,11 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p return CELIX_SUCCESS; } -static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt msg, bool last, pubsub_release_callback_t *releaseCallback){ +static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){ const int iovec_len = 3; // header + size + payload bool ret = true; - pubsub_udp_msg_pt udpMsg; - int compiledMsgSize = sizeof(*udpMsg) + msg->payloadSize; + int compiledMsgSize = sizeof(pubsub_udp_msg_t) + msg->payloadSize; struct iovec msg_iovec[iovec_len]; msg_iovec[0].iov_base = msg->header; @@ -348,51 +353,51 @@ static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt ret = false; } - //free(udpMsg); if(releaseCallback) { releaseCallback->release(msg->payload, bound); } return ret; - } static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) { int status = 0; - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; + publish_bundle_bound_service_t* bound = handle; celixThreadMutex_lock(&(bound->parent->tp_lock)); celixThreadMutex_lock(&(bound->mp_lock)); - //TODO //FIXME -> should use pointer to int as identifier, can be many pointers to int .... - printf("TODO FIX usage of msg id's in the serializer hashmap. This seems wrongly based on pointers to uints!!!!\n"); - pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId); - - int major=0, minor=0; + pubsub_msg_serializer_t *msgSer = NULL; + if (bound->map != NULL) { + msgSer = hashMap_get(bound->map->serializers, (void *)(uintptr_t)msgTypeId); + } - if (msgType != NULL && bound->parent->serializerSvc != NULL) { + if (bound->map == NULL) { + printf("TP: Serializer is not set!\n"); + } else if (msgSer == NULL ){ + printf("TP: No msg serializer available for msg type id %d\n", msgTypeId); + } - version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType); + int major=0, minor=0; + if (msgSer != NULL) { pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); - strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); - msg_hdr->type = msgTypeId; - if (msgVersion != NULL){ - version_getMajor(msgVersion, &major); - version_getMinor(msgVersion, &minor); + if (msgSer->msgVersion != NULL){ + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); msg_hdr->major = major; msg_hdr->minor = minor; } - void* serializedOutput = NULL; - int serializedOutputLen = 0; - bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen); + char* serializedOutput = NULL; + size_t serializedOutputLen = 0; + msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen); - pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); + pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg)); msg->header = msg_hdr; - msg->payload = (char *) serializedOutput; + msg->payload = serializedOutput; msg->payloadSize = serializedOutputLen; if(send_pubsub_msg(bound, msg,true, NULL) == false) { @@ -403,11 +408,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con free(serializedOutput); } else { - if (bound->parent->serializerSvc == NULL) { - printf("TP: Serializer is not set!\n"); - } else { - printf("TP: Message %u not supported.\n",msgTypeId); - } + printf("TP: Message %u not supported.\n",msgTypeId); status=-1; } @@ -430,9 +431,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){ } -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ +static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) { - publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); + publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); if (bound != NULL) { bound->service = calloc(1, sizeof(*bound->service)); @@ -445,7 +446,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to bound->getCount = 1; bound->mp_send_in_progress = false; celixThreadMutex_create(&bound->mp_lock,NULL); - bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type> arrayList_create(&bound->mp_parts); pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); @@ -457,10 +457,10 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to bound->service->send = pubsub_topicPublicationSend; bound->service->sendMultipart = NULL; //Multipart not supported (jet) for UDP - if (tp->serializerSvc != NULL){ - tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle); + //TODO check if lock on tp is needed? (e.g. is lock already done by caller?) + if (tp->serializerSvc != NULL) { + tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map); } - } else { @@ -474,30 +474,33 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to return bound; } -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) { celixThreadMutex_lock(&boundSvc->mp_lock); - if(boundSvc->service != NULL){ + if (boundSvc->service != NULL) { free(boundSvc->service); } - if(boundSvc->msgTypes != NULL){ - if (boundSvc->parent->serializerSvc != NULL){ - boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes); - } - hashMap_destroy(boundSvc->msgTypes,false,false); - } + //TODO check if lock on parent is needed, e.g. does the caller already lock? + if (boundSvc->map != NULL) { + if (boundSvc->parent->serializerSvc == NULL) { + printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n"); + } else { + boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map); + boundSvc->map = NULL; + } + } - if(boundSvc->mp_parts!=NULL){ + if (boundSvc->mp_parts!=NULL) { arrayList_destroy(boundSvc->mp_parts); } - if(boundSvc->scope!=NULL){ + if (boundSvc->scope!=NULL) { free(boundSvc->scope); } - if(boundSvc->topic!=NULL){ + if (boundSvc->topic!=NULL) { free(boundSvc->topic); } http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c index 91bce9f..da23b21 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c @@ -49,7 +49,6 @@ #include "topic_subscription.h" #include "subscriber.h" #include "publisher.h" -#include "dyn_msg_utils.h" #include "pubsub_publish_service_private.h" #include "large_udp.h" @@ -68,11 +67,15 @@ struct topic_subscription{ celix_thread_mutex_t ts_lock; bundle_context_pt context; int topicEpollFd; // EPOLL filedescriptor where the sockets are registered. - hash_map_pt servicesMap; // key = service, value = msg types map + + //NOTE. using a service ptr can be dangerous, because pointer can be reused. + //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial! + hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t* + hash_map_pt socketMap; // key = URL, value = listen-socket unsigned int nrSubscribers; largeUdp_pt largeUdpHandle; - pubsub_serializer_service_pt serializerSvc; + pubsub_serializer_service_t* serializerSvc; }; @@ -94,7 +97,7 @@ static void sigusr1_sighandler(int signo); static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); -celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){ +celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); @@ -115,7 +118,7 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl celixThreadMutex_create(&ts->ts_lock,NULL); arrayList_create(&ts->sub_ep_list); - ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL); ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); @@ -161,7 +164,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ serviceTracker_destroy(ts->tracker); arrayList_clear(ts->sub_ep_list); arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->servicesMap,false,false); + hashMap_destroy(ts->msgSerializersMap,false,false); hashMap_destroy(ts->socketMap,false,false); largeUdp_destroy(ts->largeUdpHandle); @@ -391,7 +394,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { return ts->nrSubscribers; } -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); @@ -403,50 +406,39 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p return status; } -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - - hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap); - while(hashMapIterator_hasNext(svc_iter)){ - hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter); - if (hashMap_size(msgTypes) > 0){ - ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes); + if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter); + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); } + ts->serializerSvc = NULL; } - hashMapIterator_destroy(svc_iter); - - ts->serializerSvc = NULL; - celixThreadMutex_unlock(&ts->ts_lock); return status; } -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type - + if (!hashMap_containsKey(ts->msgSerializersMap, svc)) { bundle_pt bundle = NULL; serviceReference_getBundle(reference, &bundle); - if (ts->serializerSvc != NULL){ - ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle); - } - - if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber - hashMap_destroy(msgTypes,false,false); - printf("TS: Unsupported subscriber!\n"); - } - else{ - hashMap_put(ts->servicesMap, service, msgTypes); + if (ts->serializerSvc != NULL) { + pubsub_msg_serializer_map_t* map = NULL; + ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); + if (map != NULL) { + hashMap_put(ts->msgSerializersMap, svc, map); + } } - } celixThreadMutex_unlock(&ts->ts_lock); printf("TS: New subscriber registered.\n"); @@ -454,18 +446,16 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc } -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; - celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); - if(msgTypes!=NULL){ - if (ts->serializerSvc != NULL){ - ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes); - } - hashMap_destroy(msgTypes,false,false); + + celixThreadMutex_lock(&ts->ts_lock); + if (hashMap_containsKey(ts->msgSerializersMap, svc)) { + pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc); + if (ts->serializerSvc != NULL){ + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); } } celixThreadMutex_unlock(&ts->ts_lock); @@ -475,59 +465,51 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere } -static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){ +static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){ - hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap); + celixThreadMutex_lock(&sub->ts_lock); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - hash_map_pt msgTypes = hashMapEntry_getValue(entry); + pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - pubsub_message_type *msgType = hashMap_get(msgTypes,&(msg->header.type)); + pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void *)(uintptr_t )msg->header.type); - if (msgType == NULL) { + if (msgSer == NULL) { printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type); - } - else if (sub->serializerSvc == NULL){ - printf("TS: No active serializer service found!\n"); - } - else{ + } else { void *msgInst = NULL; - char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType); - version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType); - - bool validVersion = checkVersion(msgVersion,&msg->header); - + bool validVersion = checkVersion(msgSer->msgVersion, &msg->header); if(validVersion){ - celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) msg->payload, &msgInst); - + celix_status_t status = msgSer->deserialize(msgSer->handle, msg->payload, 0, &msgInst); if (status == CELIX_SUCCESS) { bool release = true; pubsub_multipart_callbacks_t mp_callbacks; - mp_callbacks.handle = sub; + mp_callbacks.handle = map; mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; mp_callbacks.getMultipart = NULL; - subsvc->receive(subsvc->handle, name, msg->header.type, msgInst, &mp_callbacks, &release); - if(release){ - sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst); + subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release); + if (release) { + msgSer->freeMsg(msgSer->handle, msgInst); } } else{ - printf("TS: Cannot deserialize msgType %s.\n",name); + printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName); } } - else{ + else { int major=0,minor=0; - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,msg->header.major,msg->header.minor); + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); + printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName, major, minor, msg->header.major, msg->header.minor); } - } } - hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&sub->ts_lock); } static void* udp_recv_thread_func(void * arg) { @@ -555,7 +537,7 @@ static void* udp_recv_thread_func(void * arg) { unsigned int size; if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) { // Handle data - pubsub_udp_msg_pt udpMsg = NULL; + pubsub_udp_msg_t* udpMsg = NULL; if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) { printf("TS: ERROR largeUdp_read with index %d\n", index); continue; @@ -577,19 +559,19 @@ static void* udp_recv_thread_func(void * arg) { } -static void sigusr1_sighandler(int signo){ +static void sigusr1_sighandler(int signo) { printf("TS: Topic subscription being shut down...\n"); return; } -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) { bool check=false; int major=0,minor=0; - if(msgVersion!=NULL){ + if (msgVersion!=NULL) { version_getMajor(msgVersion,&major); version_getMinor(msgVersion,&minor); - if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ + if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */ check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ } } @@ -597,8 +579,24 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ return check; } -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* out) { + pubsub_msg_serializer_map_t* map = handle; + hash_map_iterator_t iter = hashMapIterator_construct(map->serializers); + unsigned int msgTypeId = 0; + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); + if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) { + msgTypeId = msgSer->msgId; + break; + } + } + + if (msgTypeId == 0) { + printf("Cannot find msg type id for msgType %s\n", msgType); + return -1; + } else { + *out = msgTypeId; + return 0; + } } http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt index 956830d..49eba87 100644 --- a/pubsub/pubsub_admin_zmq/CMakeLists.txt +++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt @@ -52,7 +52,6 @@ if (BUILD_PUBSUB_PSA_ZMQ) ${ZMQ_CRYPTO_C} ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c ) http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h index 2f81bff..3c36986 100644 --- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h +++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h @@ -49,7 +49,7 @@ struct pubsub_admin { - pubsub_serializer_service_pt serializerSvc; + pubsub_serializer_service_t* serializerSvc; bundle_context_pt bundle_context; log_helper_pt loghelper; @@ -89,7 +89,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc); -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc); +celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); #endif /* PUBSUB_ADMIN_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h index b6b76c6..158dfe7 100644 --- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h +++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h @@ -34,14 +34,14 @@ typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc); -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc); +celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h index c6fe93a..1fbbaaf 100644 --- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h +++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h @@ -38,7 +38,7 @@ typedef struct topic_subscription* topic_subscription_pt; -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out); +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out); celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); @@ -51,8 +51,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc); -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc); +celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c index 09fcd8c..5c9a5d5 100644 --- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c @@ -666,7 +666,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin return status; } -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; admin->serializerSvc = serializerSvc; @@ -694,7 +694,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize return status; } -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; admin->serializerSvc = NULL; http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c index bb8ff56..2e95874 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c @@ -47,7 +47,6 @@ #include "version.h" #include "pubsub_common.h" -#include "dyn_msg_utils.h" #include "pubsub_utils.h" #include "publisher.h" @@ -70,7 +69,7 @@ struct topic_publication { array_list_pt pub_ep_list; //List<pubsub_endpoint> hash_map_pt boundServices; //<bundle_pt,bound_service> celix_thread_mutex_t tp_lock; - pubsub_serializer_service_pt serializerSvc; + pubsub_serializer_service_t* serializerSvc; }; typedef struct publish_bundle_bound_service { @@ -78,26 +77,26 @@ typedef struct publish_bundle_bound_service { pubsub_publisher_pt service; bundle_pt bundle; char *topic; - hash_map_pt msgTypes; + pubsub_msg_serializer_map_t* map; unsigned short getCount; celix_thread_mutex_t mp_lock; bool mp_send_in_progress; array_list_pt mp_parts; -}* publish_bundle_bound_service_pt; +} publish_bundle_bound_service_t; -typedef struct pubsub_msg{ +typedef struct pubsub_msg { pubsub_msg_header_pt header; char* payload; int payloadSize; -}* pubsub_msg_pt; +} pubsub_msg_t; static unsigned int rand_range(unsigned int min, unsigned int max); static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); +static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc); static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags); @@ -105,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY @@ -235,7 +234,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); + publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter); pubsub_destroyPublishBundleBoundService(bound); } hashMapIterator_destroy(iter); @@ -332,43 +331,50 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&(pub->tp_lock)); - pub->serializerSvc = serializerSvc; - - hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(bs_iter)){ - publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter); - if (hashMap_size(boundSvc->msgTypes) == 0){ - pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle); + //clearing pref serializer + if (pub->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); + pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + bound->map = NULL; } } - hashMapIterator_destroy(bs_iter); + + pub->serializerSvc = serializerSvc; + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + bundle_pt bundle = hashMapEntry_getKey(entry); + publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry); + pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map); + } celixThreadMutex_unlock(&(pub->tp_lock)); return status; } -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){ celix_status_t status = CELIX_SUCCESS; - celixThreadMutex_lock(&(pub->tp_lock)); - hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(bs_iter)){ - publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter); - pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes); + if (pub->serializerSvc == svc) { + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter); + pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map); + boundSvc->map = NULL; + } + pub->serializerSvc = NULL; } - hashMapIterator_destroy(bs_iter); - - pub->serializerSvc = NULL; celixThreadMutex_unlock(&(pub->tp_lock)); - return status; } @@ -384,7 +390,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle); if(bound==NULL){ bound = pubsub_createPublishBundleBoundService(publish,bundle); if(bound!=NULL){ @@ -410,7 +416,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle); if(bound!=NULL){ bound->getCount--; @@ -434,7 +440,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p return CELIX_SUCCESS; } -static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){ +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){ bool ret = true; @@ -474,7 +480,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ unsigned int i = 0; unsigned int mp_num = arrayList_size(mp_msg_parts); for(;i<mp_num;i++){ - ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1)); + ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_t*)arrayList_get(mp_msg_parts,i), (i==mp_num-1)); } arrayList_clear(mp_msg_parts); @@ -489,10 +495,8 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con } static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){ - int status = 0; - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; + publish_bundle_bound_service_t* bound = handle; celixThreadMutex_lock(&(bound->mp_lock)); if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg @@ -501,38 +505,33 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy return -3; } - pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId); - + pubsub_msg_serializer_t* msgSer = NULL; + if (bound->map != NULL) { + msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId); + } int major=0, minor=0; - if (msgType != NULL && bound->parent->serializerSvc != NULL) { - - version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType); - + if (msgSer != NULL) { pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); - strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); - msg_hdr->type = msgTypeId; - if (msgVersion != NULL){ - version_getMajor(msgVersion, &major); - version_getMinor(msgVersion, &minor); + if (msgSer->msgVersion != NULL){ + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); msg_hdr->major = major; msg_hdr->minor = minor; } - void* serializedOutput = NULL; - int serializedOutputLen = 0; - bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen); - - pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); + char* serializedOutput = NULL; + size_t serializedOutputLen = 0; + msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen); + pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg)); msg->header = msg_hdr; msg->payload = (char *) serializedOutput; msg->payloadSize = serializedOutputLen; - bool snd = true; - switch(flags){ + switch (flags) { case PUBSUB_PUBLISHER_FIRST_MSG: bound->mp_send_in_progress = true; arrayList_add(bound->mp_parts,msg); @@ -602,9 +601,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){ } -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ +static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ - publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); + publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); if (bound != NULL) { bound->service = calloc(1, sizeof(*bound->service)); @@ -617,7 +616,11 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to bound->getCount = 1; bound->mp_send_in_progress = false; celixThreadMutex_create(&bound->mp_lock,NULL); - bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type> + + //TODO check if lock is needed. e.g. was the caller already locked? + if (tp->serializerSvc != NULL) { + tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map); + } arrayList_create(&bound->mp_parts); pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); @@ -628,10 +631,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to bound->service->send = pubsub_topicPublicationSend; bound->service->sendMultipart = pubsub_topicPublicationSendMultipart; - if (tp->serializerSvc != NULL){ - tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle); - } - } else { @@ -645,26 +644,24 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to return bound; } -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){ celixThreadMutex_lock(&boundSvc->mp_lock); - if(boundSvc->service != NULL){ + if (boundSvc->service != NULL) { free(boundSvc->service); } - if(boundSvc->msgTypes != NULL){ - if (boundSvc->parent->serializerSvc != NULL){ - boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes); - } - hashMap_destroy(boundSvc->msgTypes,false,false); + if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) { + boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map); + boundSvc->map = NULL; } - if(boundSvc->mp_parts!=NULL){ + if (boundSvc->mp_parts!=NULL) { arrayList_destroy(boundSvc->mp_parts); } - if(boundSvc->topic!=NULL){ + if (boundSvc->topic!=NULL) { free(boundSvc->topic); } http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c index 3de56af..7ef2c5d 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c @@ -46,7 +46,6 @@ #include "subscriber.h" #include "publisher.h" -#include "dyn_msg_utils.h" #include "pubsub_utils.h" #ifdef BUILD_WITH_ZMQ_SECURITY @@ -58,8 +57,7 @@ #define POLL_TIMEOUT 250 #define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" -struct topic_subscription{ - +struct topic_subscription { zsock_t* zmq_socket; zcert_t * zmq_cert; zcert_t * zmq_pub_cert; @@ -71,26 +69,25 @@ struct topic_subscription{ celix_thread_mutex_t ts_lock; bundle_context_pt context; - hash_map_pt servicesMap; // key = service, value = msg types map + hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t* array_list_pt pendingConnections; array_list_pt pendingDisconnections; celix_thread_mutex_t pendingConnections_lock; celix_thread_mutex_t pendingDisconnections_lock; unsigned int nrSubscribers; - pubsub_serializer_service_pt serializerSvc; - + pubsub_serializer_service_t* serializerSvc; }; -typedef struct complete_zmq_msg{ +typedef struct complete_zmq_msg { zframe_t* header; zframe_t* payload; }* complete_zmq_msg_pt; -typedef struct mp_handle{ - hash_map_pt svc_msg_db; +typedef struct mp_handle { + pubsub_msg_serializer_map_t* map; hash_map_pt rcv_msg_map; -}* mp_handle_pt; +} mp_handle_t; typedef struct msg_map_entry{ bool retain; @@ -104,12 +101,12 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); static void sigusr1_sighandler(int signo); static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); -static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list); -static void destroy_mp_handle(mp_handle_pt mp_handle); +static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list); +static void destroy_mp_handle(mp_handle_t* mp_handle); static void connectPendingPublishers(topic_subscription_pt sub); static void disconnectPendingPublishers(topic_subscription_pt sub); -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){ +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){ celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY @@ -223,7 +220,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, celixThreadMutex_create(&ts->socket_lock, NULL); celixThreadMutex_create(&ts->ts_lock,NULL); arrayList_create(&ts->sub_ep_list); - ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL); arrayList_create(&ts->pendingConnections); arrayList_create(&ts->pendingDisconnections); celixThreadMutex_create(&ts->pendingConnections_lock, NULL); @@ -269,7 +266,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ serviceTracker_destroy(ts->tracker); arrayList_clear(ts->sub_ep_list); arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->servicesMap,false,false); + hashMap_destroy(ts->msgSerializers,false,false); celixThreadMutex_lock(&ts->pendingConnections_lock); arrayList_destroy(ts->pendingConnections); @@ -429,7 +426,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { return ts->nrSubscribers; } -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); @@ -441,22 +438,18 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p return status; } -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){ +celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* svc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - - hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap); - while(hashMapIterator_hasNext(svc_iter)){ - hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter); - if (hashMap_size(msgTypes) > 0){ - ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes); + if (ts->serializerSvc == svc) { + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers); + while(hashMapIterator_hasNext(&iter)){ + pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter); + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); } } - hashMapIterator_destroy(svc_iter); - ts->serializerSvc = NULL; - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -467,24 +460,19 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type - + if (!hashMap_containsKey(ts->msgSerializers, service)) { bundle_pt bundle = NULL; serviceReference_getBundle(reference, &bundle); - if (ts->serializerSvc != NULL){ - ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle); - } - - if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber - hashMap_destroy(msgTypes,false,false); - printf("TS: Unsupported subscriber!\n"); - } - else{ - hashMap_put(ts->servicesMap, service, msgTypes); + if (ts->serializerSvc != NULL) { + pubsub_msg_serializer_map_t* map = NULL; + ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); + if (map != NULL) { + hashMap_put(ts->msgSerializers, service, map); + } else { + printf("TS: Cannot create msg serializer map\n"); + } } - } celixThreadMutex_unlock(&ts->ts_lock); printf("TS: New subscriber registered.\n"); @@ -497,14 +485,9 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); - if(msgTypes!=NULL){ - if (ts->serializerSvc != NULL){ - ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes); - } - hashMap_destroy(msgTypes,false,false); - } + if (hashMap_containsKey(ts->msgSerializers, service)) { + pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service); + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); } celixThreadMutex_unlock(&ts->ts_lock); @@ -513,66 +496,55 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere } -static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ +static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) { pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); - hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - hash_map_pt msgTypes = hashMapEntry_getValue(entry); + pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type)); - if (msgType == NULL) { - printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type); - } - else if (sub->serializerSvc == NULL){ - printf("TS: No active serializer found!\n"); - } - else{ + pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t )first_msg_hdr->type); + if (msgSer == NULL) { + printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n", first_msg_hdr->type); + } else { void *msgInst = NULL; - char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType); - version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType); - - bool validVersion = checkVersion(msgVersion,first_msg_hdr); - + bool validVersion = checkVersion(msgSer->msgVersion, first_msg_hdr); if(validVersion){ - - celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst); + celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst); if (status == CELIX_SUCCESS) { bool release = true; - mp_handle_pt mp_handle = create_mp_handle(sub, msgTypes,msg_list); + mp_handle_t* mp_handle = create_mp_handle(sub, map, msg_list); pubsub_multipart_callbacks_t mp_callbacks; mp_callbacks.handle = mp_handle; mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; mp_callbacks.getMultipart = pubsub_getMultipart; - subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release); + subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release); - if(release){ - sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst); + if (release) { + msgSer->freeMsg(msgSer->handle, msgInst); } - if(mp_handle!=NULL){ + if (mp_handle!=NULL) { destroy_mp_handle(mp_handle); } } else{ - printf("TS: Cannot deserialize msgType %s.\n",name); + printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName); } - } - else{ + } else { int major=0,minor=0; - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor); + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); + printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName, major, minor, first_msg_hdr->major, first_msg_hdr->minor); } - } } - hashMapIterator_destroy(iter); int i = 0; for(;i<arrayList_size(msg_list);i++){ @@ -737,7 +709,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain return -1; } - mp_handle_pt mp_handle = (mp_handle_pt)handle; + mp_handle_t* mp_handle = handle; msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId); if(entry!=NULL){ entry->retain = retain; @@ -753,63 +725,55 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain } -static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){ +static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) { if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message return NULL; } - mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle)); - mp_handle->svc_msg_db = svc_msg_db; - mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL); - - int i=1; //We skip the first message, it will be handle differently - for(;i<arrayList_size(rcv_msg_list);i++){ - complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i); + mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle)); + mp_handle->map = map; + mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL); + int i; //We skip the first message, it will be handle differently + for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) { + complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i); pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header); - pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type)); - if (msgType != NULL && sub->serializerSvc != NULL) { + pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t)(header->type)); + if (msgSer != NULL) { void *msgInst = NULL; - version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType); - - bool validVersion = checkVersion(msgVersion,header); - + bool validVersion = checkVersion(msgSer->msgVersion, header); if(validVersion){ - celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(c_msg->payload), &msgInst); + //TODO make the getMultipart lazy? + celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 0, &msgInst); if(status == CELIX_SUCCESS){ - unsigned int* msgId = calloc(1,sizeof(unsigned int)); - *msgId = header->type; msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); entry->msgInst = msgInst; - hashMap_put(mp_handle->rcv_msg_map,msgId,entry); + hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)(header->type), entry); } } } - } - return mp_handle; - } -static void destroy_mp_handle(mp_handle_pt mp_handle){ +static void destroy_mp_handle(mp_handle_t* mp_handle){ hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map); while(hashMapIterator_hasNext(iter)){ hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry); + unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry); msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry); - pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId); - if(msgType!=NULL){ - if(!msgEntry->retain){ - free(msgEntry->msgInst); + pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId); + if (msgSer != NULL) { + if (!msgEntry->retain) { + msgSer->freeMsg(msgSer->handle, msgEntry->msgInst); } } else{ - printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId); + printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n", msgId); } } hashMapIterator_destroy(iter); http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/dyn_msg_utils.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/pubsub/pubsub_common/public/include/dyn_msg_utils.h deleted file mode 100644 index 71085ab..0000000 --- a/pubsub/pubsub_common/public/include/dyn_msg_utils.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * dyn_msg_utils.h - * - * \date Nov 11, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef DYN_MSG_UTILS_H_ -#define DYN_MSG_UTILS_H_ - -#include "bundle.h" -#include "hash_map.h" - -unsigned int uintHash(const void * uintNum); -int uintEquals(const void * uintNum, const void * toCompare); - -void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle); -void emptyMsgTypesMap(hash_map_pt msgTypesMap); - -#endif /* DYN_MSG_UTILS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h index 52cb75c..f7ab7e0 100644 --- a/pubsub/pubsub_common/public/include/pubsub_admin.h +++ b/pubsub/pubsub_common/public/include/pubsub_admin.h @@ -56,8 +56,8 @@ struct pubsub_admin_service { celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); - celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc); - celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc); + celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); + celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); }; http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_serializer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h index f2df075..e9f9f6c 100644 --- a/pubsub/pubsub_common/public/include/pubsub_serializer.h +++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h @@ -24,33 +24,44 @@ * \copyright Apache License, Version 2.0 */ -#ifndef PUBSUB_SERIALIZER_H_ -#define PUBSUB_SERIALIZER_H_ +#ifndef PUBSUB_SERIALIZER_SERVICE_H_ +#define PUBSUB_SERIALIZER_SERVICE_H_ #include "service_reference.h" #include "pubsub_common.h" -typedef struct _pubsub_message_type pubsub_message_type; - -typedef struct pubsub_serializer *pubsub_serializer_pt; - -struct pubsub_serializer_service { +/** + * There should be a pubsub_serializer_t + * per msg type (msg id) per bundle + * + * The pubsub_serializer_service can create + * a serializer_map per bundle. Potentially using + * the extender pattern. + */ +typedef struct pubsub_msg_serializer { + void* handle; + unsigned int msgId; + const char* msgName; + version_pt msgVersion; - pubsub_serializer_pt serializer; + celix_status_t (*serialize)(void* handle, const void* input, char** out, size_t* outLen); + celix_status_t (*deserialize)(void* handle, const char* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed - celix_status_t (*serialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen); - celix_status_t (*deserialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output); + void (*freeMsg)(void* handle, void* msg); +} pubsub_msg_serializer_t; - void (*fillMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle); - void (*emptyMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap); +typedef struct pubsub_msg_serializer_map { + bundle_pt bundle; + hash_map_pt serializers; //key = msg id (unsigned int), value = pubsub_serializer_t* +} pubsub_msg_serializer_map_t; - version_pt (*getVersion)(pubsub_serializer_pt serializer, pubsub_message_type *msgType); - char* (*getName)(pubsub_serializer_pt serializer, pubsub_message_type *msgType); - void (*freeMsg)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg); +typedef struct pubsub_serializer_service { + void* handle; -}; + celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, pubsub_msg_serializer_map_t** out); + celix_status_t (*destroySerializerMap)(void* handle, pubsub_msg_serializer_map_t* map); -typedef struct pubsub_serializer_service *pubsub_serializer_service_pt; +} pubsub_serializer_service_t; -#endif /* PUBSUB_SERIALIZER_H_ */ +#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */
