Repository: celix
Updated Branches:
  refs/heads/develop 6cd9df7ec -> 4b8222dc1


Added shell command to pubsub to get overview of pubs and subs, fixed some code 
warnings


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/4b8222dc
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/4b8222dc
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/4b8222dc

Branch: refs/heads/develop
Commit: 4b8222dc11e7ae6aa75d5648bfd940e72518f755
Parents: 6cd9df7
Author: Erjan Altena <erjanalt...@gmail.com>
Authored: Tue Feb 6 21:11:07 2018 +0100
Committer: Erjan Altena <erjanalt...@gmail.com>
Committed: Tue Feb 6 21:11:50 2018 +0100

----------------------------------------------------------------------
 pubsub/pubsub_topology_manager/CMakeLists.txt   |  2 +-
 .../src/pubsub_topology_manager.c               | 96 ++++++++++++++------
 .../src/pubsub_topology_manager.h               |  5 +
 3 files changed, 72 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt 
b/pubsub/pubsub_topology_manager/CMakeLists.txt
index 7ae00f6..9cb452b 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -23,7 +23,7 @@ add_celix_bundle(celix_pubsub_topology_manager
        src/pubsub_topology_manager.c
        src/pubsub_topology_manager.h
 )
-target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework 
Celix::log_helper Celix::pubsub_spi)
+target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework 
Celix::log_helper Celix::pubsub_spi Celix::shell_api)
 get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
 celix_bundle_files(celix_pubsub_topology_manager
        ${DESC}

http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c 
b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index a63b275..71a9ad9 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -32,22 +32,53 @@
 #include "array_list.h"
 #include "bundle_context.h"
 #include "constants.h"
-#include "module.h"
-#include "bundle.h"
-#include "filter.h"
 #include "listener_hook_service.h"
 #include "utils.h"
-#include "service_reference.h"
-#include "service_registration.h"
 #include "log_service.h"
 #include "log_helper.h"
 
 #include "publisher_endpoint_announce.h"
 #include "pubsub_topology_manager.h"
-#include "pubsub_endpoint.h"
 #include "pubsub_admin.h"
-#include "pubsub_utils.h"
 
+static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
+       for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); 
hashMapIterator_hasNext(&iter);) {
+               const char* key = (const char*)hashMapIterator_nextKey(&iter);
+               fprintf(outStream, "    Topic=%s\n", key);
+               array_list_pt ep_list = hashMap_get(endpoints, key);
+               for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
+                       pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
+                       fprintf(outStream, "        Endpoint %d\n", i);
+                       fprintf(outStream, "            Endpoint properties\n");
+                       const char *propKey;
+                       if(ep->endpoint_props) {
+                               PROPERTIES_FOR_EACH(ep->endpoint_props, 
propKey) {
+                                       fprintf(outStream, "                %s 
=> %s\n", propKey, properties_get(ep->endpoint_props, propKey));
+                               }
+                       }
+                       if(ep->topic_props) {
+                               fprintf(outStream, "            Topic 
properties\n");
+                               PROPERTIES_FOR_EACH(ep->topic_props, propKey) {
+                                       fprintf(outStream, "                %s 
=> %s\n", propKey, properties_get(ep->topic_props, propKey));
+                               }
+                       }
+               }
+       }
+
+}
+
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE 
*outStream, FILE *errorStream) {
+       pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt) 
handle;
+       if (manager->publications && !hashMap_isEmpty(manager->publications)) {
+               fprintf(outStream, "Publications:\n");
+               print_endpoint_info(manager->publications, outStream);
+       }
+       if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) 
{
+               fprintf(outStream, "Subscriptions:\n");
+               print_endpoint_info(manager->subscriptions, outStream);
+       }
+       return CELIX_SUCCESS;
+}
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
        celix_status_t status = CELIX_SUCCESS;
@@ -62,12 +93,12 @@ celix_status_t 
pubsub_topologyManager_create(bundle_context_pt context, log_help
        celix_thread_mutexattr_t psaAttr;
        celixThreadMutexAttr_create(&psaAttr);
        celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-       status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+       status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
        celixThreadMutexAttr_destroy(&psaAttr);
 
-       status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
-       status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
-       status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+       status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+       status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
+       status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
 
        arrayList_create(&(*manager)->psaList);
 
@@ -76,7 +107,14 @@ celix_status_t 
pubsub_topologyManager_create(bundle_context_pt context, log_help
        (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
        (*manager)->loghelper = logHelper;
-
+       (*manager)->shellCmdService.handle = *manager;
+       (*manager)->shellCmdService.executeCommand = shellCommand;
+
+       properties_pt shellProps = properties_create();
+       properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
+       properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
+       properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: 
Overview of PubSub");
+       bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, 
&((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
        return status;
 }
 
@@ -97,7 +135,7 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
        hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
        while(hashMapIterator_hasNext(pubit)){
                array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(pubit);
-               int i;
+               unsigned int i;
                for(i=0;i<arrayList_size(l);i++){
                        
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
                }
@@ -112,7 +150,7 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
        hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
        while(hashMapIterator_hasNext(subit)){
                array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(subit);
-               int i;
+               unsigned int i;
                for(i=0;i<arrayList_size(l);i++){
                        
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
                }
@@ -122,7 +160,7 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
        hashMap_destroy(manager->subscriptions, true, false);
        celixThreadMutex_unlock(&manager->subscriptionsLock);
        celixThreadMutex_destroy(&manager->subscriptionsLock);
-
+       serviceRegistration_unregister(manager->shellCmdReg);
        free(manager);
 
        return status;
@@ -131,7 +169,7 @@ celix_status_t 
pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 celix_status_t pubsub_topologyManager_psaAdded(void * handle, 
service_reference_pt reference, void * service) {
        celix_status_t status = CELIX_SUCCESS;
        pubsub_topology_manager_pt manager = handle;
-       int i;
+       unsigned int i;
 
        pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
PSA");
@@ -211,7 +249,7 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * 
handle, service_referenc
                                bundleContext_getService(manager->context, 
disc_sr, (void**) &disc);
                                const char* fwUUID = NULL;
                                
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-                               int i;
+                               unsigned int i;
                                for(i=0;i<arrayList_size(pubEP_list);i++){
                                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
                                        
if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
@@ -273,7 +311,7 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void 
* handle, service_ref
 
                celixThreadMutex_unlock(&manager->subscriptionsLock);
 
-               int j;
+               unsigned int j;
                double score = 0;
                double best_score = 0;
                pubsub_admin_service_pt best_psa = NULL;
@@ -328,7 +366,7 @@ celix_status_t 
pubsub_topologyManager_subscriberRemoved(void * handle, service_r
        pubsub_endpoint_pt subcmp = NULL;
        if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) 
== CELIX_SUCCESS){
 
-               int j,k;
+               unsigned int j,k;
 
                // Inform discoveries that we not interested in the topic any 
more
                celixThreadMutex_lock(&manager->discoveryListLock);
@@ -411,7 +449,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
        hash_map_iterator_pt iter = 
hashMapIterator_create(manager->publications);
        while(hashMapIterator_hasNext(iter)){
                array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               for(int i = 0; i < arrayList_size(pubEP_list); i++) {
+               for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
                        if( (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && 
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
                                status += 
disc->announcePublisher(disc->handle,pubEP);
@@ -427,7 +465,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
 
        while(hashMapIterator_hasNext(iter)) {
                array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               int i;
+               unsigned int i;
                for(i=0;i<arrayList_size(l);i++){
                        pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
 
@@ -441,9 +479,7 @@ celix_status_t 
pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
 }
 
 celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, 
reference, service);
+       celix_status_t status = 
pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
        if (status == CELIX_SUCCESS) {
                status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, 
reference, service);
        }
@@ -474,7 +510,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
        celix_status_t status = CELIX_SUCCESS;
        pubsub_topology_manager_pt manager = handle;
 
-       int l_index;
+       unsigned int l_index;
 
        for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 
@@ -495,7 +531,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
 
                        celixThreadMutex_unlock(&manager->publicationsLock);
 
-                       int j;
+                       unsigned int j;
                        double score = 0;
                        double best_score = 0;
                        pubsub_admin_service_pt best_psa = NULL;
@@ -542,7 +578,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
        celix_status_t status = CELIX_SUCCESS;
        pubsub_topology_manager_pt manager = handle;
 
-       int l_index;
+       unsigned int l_index;
 
        for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 
@@ -552,7 +588,7 @@ celix_status_t 
pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
                if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) 
== CELIX_SUCCESS){
 
 
-                       int j,k;
+                       unsigned int j,k;
                        celixThreadMutex_lock(&manager->psaListLock);
                        celixThreadMutex_lock(&manager->publicationsLock);
 
@@ -638,7 +674,7 @@ celix_status_t 
pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
        pubsubEndpoint_clone(pubEP, &p);
        arrayList_add(pub_list_by_topic,p);
 
-       int j;
+       unsigned int j;
        double score = 0;
        double best_score = 0;
        pubsub_admin_service_pt best_psa = NULL;
@@ -675,7 +711,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void 
*handle, pubsub_endpo
        pubsub_topology_manager_pt manager = handle;
        celixThreadMutex_lock(&manager->psaListLock);
        celixThreadMutex_lock(&manager->publicationsLock);
-       int i;
+       unsigned int i;
 
        char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
        array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);

http://git-wip-us.apache.org/repos/asf/celix/blob/4b8222dc/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h 
b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 0074a75..cdcc651 100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -30,6 +30,7 @@
 #include "service_reference.h"
 #include "bundle_context.h"
 #include "log_helper.h"
+#include "command.h"
 
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
@@ -52,6 +53,10 @@ struct pubsub_topology_manager {
        celix_thread_mutex_t subscriptionsLock;
        hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
 
+       command_service_t shellCmdService;
+       service_registration_pt  shellCmdReg;
+
+
        log_helper_pt loghelper;
 };
 

Reply via email to