Repository: celix Updated Branches: refs/heads/develop 6cd9df7ec -> 4b8222dc1
Added shell command to pubsub to get overview of pubs and subs, fixed some code warnings Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/4b8222dc Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/4b8222dc Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/4b8222dc Branch: refs/heads/develop Commit: 4b8222dc11e7ae6aa75d5648bfd940e72518f755 Parents: 6cd9df7 Author: Erjan Altena <erjanalt...@gmail.com> Authored: Tue Feb 6 21:11:07 2018 +0100 Committer: Erjan Altena <erjanalt...@gmail.com> Committed: Tue Feb 6 21:11:50 2018 +0100 ---------------------------------------------------------------------- pubsub/pubsub_topology_manager/CMakeLists.txt | 2 +- .../src/pubsub_topology_manager.c | 96 ++++++++++++++------ .../src/pubsub_topology_manager.h | 5 + 3 files changed, 72 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt index 7ae00f6..9cb452b 100644 --- a/pubsub/pubsub_topology_manager/CMakeLists.txt +++ b/pubsub/pubsub_topology_manager/CMakeLists.txt @@ -23,7 +23,7 @@ add_celix_bundle(celix_pubsub_topology_manager src/pubsub_topology_manager.c src/pubsub_topology_manager.h ) -target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi) +target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi Celix::shell_api) get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR) celix_bundle_files(celix_pubsub_topology_manager ${DESC} http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index a63b275..71a9ad9 100644 --- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -32,22 +32,53 @@ #include "array_list.h" #include "bundle_context.h" #include "constants.h" -#include "module.h" -#include "bundle.h" -#include "filter.h" #include "listener_hook_service.h" #include "utils.h" -#include "service_reference.h" -#include "service_registration.h" #include "log_service.h" #include "log_helper.h" #include "publisher_endpoint_announce.h" #include "pubsub_topology_manager.h" -#include "pubsub_endpoint.h" #include "pubsub_admin.h" -#include "pubsub_utils.h" +static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) { + for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) { + const char* key = (const char*)hashMapIterator_nextKey(&iter); + fprintf(outStream, " Topic=%s\n", key); + array_list_pt ep_list = hashMap_get(endpoints, key); + for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) { + pubsub_endpoint_pt ep = arrayList_get(ep_list, i); + fprintf(outStream, " Endpoint %d\n", i); + fprintf(outStream, " Endpoint properties\n"); + const char *propKey; + if(ep->endpoint_props) { + PROPERTIES_FOR_EACH(ep->endpoint_props, propKey) { + fprintf(outStream, " %s => %s\n", propKey, properties_get(ep->endpoint_props, propKey)); + } + } + if(ep->topic_props) { + fprintf(outStream, " Topic properties\n"); + PROPERTIES_FOR_EACH(ep->topic_props, propKey) { + fprintf(outStream, " %s => %s\n", propKey, properties_get(ep->topic_props, propKey)); + } + } + } + } + +} + +static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { + pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt) handle; + if (manager->publications && !hashMap_isEmpty(manager->publications)) { + fprintf(outStream, "Publications:\n"); + print_endpoint_info(manager->publications, outStream); + } + if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) { + fprintf(outStream, "Subscriptions:\n"); + print_endpoint_info(manager->subscriptions, outStream); + } + return CELIX_SUCCESS; +} celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) { celix_status_t status = CELIX_SUCCESS; @@ -62,12 +93,12 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help celix_thread_mutexattr_t psaAttr; celixThreadMutexAttr_create(&psaAttr); celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); - status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr); + status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr); celixThreadMutexAttr_destroy(&psaAttr); - status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL); - status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL); - status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL); + status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL); + status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL); + status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL); arrayList_create(&(*manager)->psaList); @@ -76,7 +107,14 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*manager)->loghelper = logHelper; - + (*manager)->shellCmdService.handle = *manager; + (*manager)->shellCmdService.executeCommand = shellCommand; + + properties_pt shellProps = properties_create(); + properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info"); + properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info"); + properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub"); + bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg)); return status; } @@ -97,7 +135,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); while(hashMapIterator_hasNext(pubit)){ array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit); - int i; + unsigned int i; for(i=0;i<arrayList_size(l);i++){ pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); } @@ -112,7 +150,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); while(hashMapIterator_hasNext(subit)){ array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit); - int i; + unsigned int i; for(i=0;i<arrayList_size(l);i++){ pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); } @@ -122,7 +160,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager hashMap_destroy(manager->subscriptions, true, false); celixThreadMutex_unlock(&manager->subscriptionsLock); celixThreadMutex_destroy(&manager->subscriptionsLock); - + serviceRegistration_unregister(manager->shellCmdReg); free(manager); return status; @@ -131,7 +169,7 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_pt manager = handle; - int i; + unsigned int i; pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service; logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA"); @@ -211,7 +249,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc bundleContext_getService(manager->context, disc_sr, (void**) &disc); const char* fwUUID = NULL; bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - int i; + unsigned int i; for(i=0;i<arrayList_size(pubEP_list);i++){ pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){ @@ -273,7 +311,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref celixThreadMutex_unlock(&manager->subscriptionsLock); - int j; + unsigned int j; double score = 0; double best_score = 0; pubsub_admin_service_pt best_psa = NULL; @@ -328,7 +366,7 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r pubsub_endpoint_pt subcmp = NULL; if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){ - int j,k; + unsigned int j,k; // Inform discoveries that we not interested in the topic any more celixThreadMutex_lock(&manager->discoveryListLock); @@ -411,7 +449,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service hash_map_iterator_pt iter = hashMapIterator_create(manager->publications); while(hashMapIterator_hasNext(iter)){ array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); - for(int i = 0; i < arrayList_size(pubEP_list); i++) { + for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) { pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){ status += disc->announcePublisher(disc->handle,pubEP); @@ -427,7 +465,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service while(hashMapIterator_hasNext(iter)) { array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter); - int i; + unsigned int i; for(i=0;i<arrayList_size(l);i++){ pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); @@ -441,9 +479,7 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service } celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service); + celix_status_t status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service); if (status == CELIX_SUCCESS) { status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service); } @@ -474,7 +510,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_ celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_pt manager = handle; - int l_index; + unsigned int l_index; for (l_index = 0; l_index < arrayList_size(listeners); l_index++) { @@ -495,7 +531,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_ celixThreadMutex_unlock(&manager->publicationsLock); - int j; + unsigned int j; double score = 0; double best_score = 0; pubsub_admin_service_pt best_psa = NULL; @@ -542,7 +578,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_pt manager = handle; - int l_index; + unsigned int l_index; for (l_index = 0; l_index < arrayList_size(listeners); l_index++) { @@ -552,7 +588,7 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){ - int j,k; + unsigned int j,k; celixThreadMutex_lock(&manager->psaListLock); celixThreadMutex_lock(&manager->publicationsLock); @@ -638,7 +674,7 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end pubsubEndpoint_clone(pubEP, &p); arrayList_add(pub_list_by_topic,p); - int j; + unsigned int j; double score = 0; double best_score = 0; pubsub_admin_service_pt best_psa = NULL; @@ -675,7 +711,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo pubsub_topology_manager_pt manager = handle; celixThreadMutex_lock(&manager->psaListLock); celixThreadMutex_lock(&manager->publicationsLock); - int i; + unsigned int i; char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h index 0074a75..cdcc651 100644 --- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h +++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h @@ -30,6 +30,7 @@ #include "service_reference.h" #include "bundle_context.h" #include "log_helper.h" +#include "command.h" #include "pubsub_common.h" #include "pubsub_endpoint.h" @@ -52,6 +53,10 @@ struct pubsub_topology_manager { celix_thread_mutex_t subscriptionsLock; hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>> + command_service_t shellCmdService; + service_registration_pt shellCmdReg; + + log_helper_pt loghelper; };