Author: bpetri
Date: Mon Jun 9 09:15:11 2014
New Revision: 1601328
URL: http://svn.apache.org/r1601328
Log:
CELIX-120: changed structure of shared data. The data itself includes now also
the framework uuid, services are saved per framework.
Modified:
incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h
incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c
Modified:
incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h
URL:
http://svn.apache.org/viewvc/incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h?rev=1601328&r1=1601327&r2=1601328&view=diff
==============================================================================
---
incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h
(original)
+++
incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h
Mon Jun 9 09:15:11 2014
@@ -33,6 +33,12 @@
#define DISCOVERY_SEM_FILENAME "/dev/null"
#define DISCOVERY_SEM_FTOK_ID 52
+#define DISCOVERY_SHM_FW_SERVICES "fw.services"
+#define DISCOVERY_SHM_SRVC_PROPERTIES "srvc.props"
+
+
+#include <apr_strings.h>
+#include <apr_thread_proc.h>
#include "endpoint_listener.h"
@@ -43,6 +49,24 @@ struct ipc_shmData
char data[DISCOVERY_SHM_MEMSIZE - (2* (sizeof(int) + sizeof(key_t)))];
};
+struct discovery {
+ bundle_context_pt context;
+ apr_pool_t *pool;
+
+ hash_map_pt listenerReferences;
+
+ bool running;
+
+ int shmId;
+ void *shmBaseAdress;
+ apr_thread_t *shmPollThread;
+
+ hash_map_pt shmServices;
+
+ array_list_pt registered;
+};
+
+
typedef struct discovery *discovery_pt;
typedef struct ipc_shmData *ipc_shmData_pt;
Modified:
incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c
URL:
http://svn.apache.org/viewvc/incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c?rev=1601328&r1=1601327&r2=1601328&view=diff
==============================================================================
--- incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c
(original)
+++ incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c
Mon Jun 9 09:15:11 2014
@@ -1,5 +1,4 @@
-
/**
*Licensed to the Apache Software Foundation (ASF) under one
*or more contributor license agreements. See the NOTICE file
@@ -37,6 +36,7 @@
#include <sys/shm.h>
#include "bundle_context.h"
+#include "constants.h"
#include "array_list.h"
#include "utils.h"
#include "celix_errno.h"
@@ -47,24 +47,6 @@
#include "discovery.h"
-struct discovery {
- bundle_context_pt context;
- apr_pool_t *pool;
-
- hash_map_pt listenerReferences;
-
- bool running;
-
- int shmId;
- void *shmBaseAdress;
- apr_thread_t *shmPollThread;
-
- hash_map_pt shmServices;
-
- array_list_pt handled;
- array_list_pt registered;
-};
-
celix_status_t discovery_informListener(discovery_pt discovery,
endpoint_listener_pt listener, endpoint_description_pt endpoint);
celix_status_t discovery_informListenerOfRemoval(discovery_pt discovery,
endpoint_listener_pt listener, endpoint_description_pt endpoint);
@@ -74,13 +56,14 @@ celix_status_t discovery_removeService(d
static void *APR_THREAD_FUNC discovery_pollSHMServices(apr_thread_t *thd, void
*data);
celix_status_t discovery_lock(int semId, int semNr);
celix_status_t discovery_unlock(int semId, int semNr);
-celix_status_t discovery_wait(int semId, int semNr);
+celix_status_t discovery_broadcast(int semId, int semNr);
+celix_status_t discovery_stillAlive(char* pid, bool* stillAlive);
celix_status_t discovery_updateLocalSHMServices(discovery_pt discovery);
-celix_status_t discovery_updateSHMServices(discovery_pt discovery, char
*serviceName, char *nsEncAttributes);
+celix_status_t discovery_updateSHMServices(discovery_pt discovery,
endpoint_description_pt endpoint, bool addService);
-celix_status_t discovery_registerSHMService(discovery_pt discovery, char *url,
char *attributes);
-celix_status_t discovery_deregisterSHMService(discovery_pt discovery, char
*serviceName);
+celix_status_t discovery_registerSHMService(discovery_pt discovery,
endpoint_description_pt endpoint);
+celix_status_t discovery_deregisterSHMService(discovery_pt discovery,
endpoint_description_pt endpoint);
celix_status_t discovery_createOrAttachShm(discovery_pt discovery);
celix_status_t discovery_stopOrDetachShm(discovery_pt discovery);
@@ -99,8 +82,6 @@ celix_status_t discovery_create(apr_pool
(*discovery)->shmServices = hashMap_create(utils_stringHash,
NULL, utils_stringEquals, NULL);
(*discovery)->running = true;
- (*discovery)->handled = NULL;
- arrayList_create(&(*discovery)->handled);
(*discovery)->registered = NULL;
arrayList_create(&(*discovery)->registered);
@@ -109,7 +90,7 @@ celix_status_t discovery_create(apr_pool
if ((status = discovery_createOrAttachShm(*discovery)) !=
CELIX_SUCCESS)
{
- printf("DISCOVERY: Shared Memory initialization
failed.\n");
+ printf("DISCOVERY: Shared Memory initialization
failed.");
}
else
{
@@ -120,9 +101,18 @@ celix_status_t discovery_create(apr_pool
return status;
}
-celix_status_t discovery_stop(discovery_pt discovery) {
+celix_status_t discovery_destroy(discovery_pt discovery) {
celix_status_t status = CELIX_SUCCESS;
+ arrayList_destroy(discovery->registered);
+ hashMap_destroy(discovery->shmServices, false, false);
+ hashMap_destroy(discovery->listenerReferences, false, false);
+
+ return status;
+}
+
+celix_status_t discovery_stop(discovery_pt discovery) {
+ celix_status_t status = CELIX_SUCCESS;
apr_status_t tstat;
discovery->running = false;
@@ -130,14 +120,12 @@ celix_status_t discovery_stop(discovery_
if (shmData != NULL)
{
- discovery_unlock(shmData->semId, 1);
- discovery_wait(shmData->semId, 2);
- discovery_lock(shmData->semId, 1);
+ discovery_broadcast(shmData->semId, 1);
apr_status_t stat = apr_thread_join(&tstat,
discovery->shmPollThread);
if (stat != APR_SUCCESS && tstat != APR_SUCCESS)
- {
+ {
printf("DISCOVERY: An error occured while stopping the
SHM polling thread.\n");
status = CELIX_BUNDLE_EXCEPTION;
}
@@ -146,16 +134,14 @@ celix_status_t discovery_stop(discovery_
printf("DISCOVERY: SHM polling thread sucessfully
stopped.\n");
int i;
for (i = 0; i < arrayList_size(discovery->registered);
i++) {
- char *serviceName =
arrayList_get(discovery->registered, i);
- printf("DISCOVERY: deregistering service
%s.\n", serviceName);
- status =
discovery_deregisterSHMService(discovery, serviceName);
- }
- //discovery_lock(shmData->semId, 1);
+ endpoint_description_pt endpoint =
(endpoint_description_pt) arrayList_get(discovery->registered, i);
+ printf("DISCOVERY: deregistering service
%s.\n", endpoint->service);
+ status =
discovery_deregisterSHMService(discovery, endpoint);
+ }
// detach from shm
status = discovery_stopOrDetachShm(discovery);
-
}
}
@@ -172,9 +158,10 @@ celix_status_t discovery_removeService(d
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
service_reference_pt reference = hashMapEntry_getKey(entry);
endpoint_listener_pt listener = NULL;
- bundleContext_getService(discovery->context, reference,
(void**)&listener);
+ bundleContext_getService(discovery->context, reference,
(void**) &listener);
discovery_informListenerOfRemoval(discovery, listener,
endpoint);
}
+ hashMapIterator_destroy(iter);
return status;
}
@@ -187,7 +174,6 @@ celix_status_t discovery_addService(disc
printf("DISCOVERY: Add service (%s)\n", endpoint->service);
-
while (hashMapIterator_hasNext(iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
service_reference_pt reference = hashMapEntry_getKey(entry);
@@ -203,16 +189,15 @@ celix_status_t discovery_addService(disc
filter_match(filter, endpoint->properties, &matchResult);
if (matchResult) {
printf("DISCOVERY: Add service (%s)\n",
endpoint->service);
- bundleContext_getService(discovery->context, reference,
(void**)&listener);
+ bundleContext_getService(discovery->context, reference,
(void**) &listener);
discovery_informListener(discovery, listener, endpoint);
}
}
+ hashMapIterator_destroy(iter);
return status;
}
-
-
celix_status_t discovery_informListener(discovery_pt discovery,
endpoint_listener_pt listener, endpoint_description_pt endpoint) {
celix_status_t status = CELIX_SUCCESS;
@@ -226,16 +211,15 @@ celix_status_t discovery_informListenerO
return status;
}
-
celix_status_t discovery_lock(int semId, int semNr)
{
celix_status_t status = CELIX_SUCCESS;
int semOpStatus = 0;
struct sembuf semOperation;
- semOperation.sem_num=semNr;
- semOperation.sem_op=-1;
- semOperation.sem_flg=0;
+ semOperation.sem_num = semNr;
+ semOperation.sem_op = -1;
+ semOperation.sem_flg = 0;
do
{
@@ -245,21 +229,20 @@ celix_status_t discovery_lock(int semId,
{
status = CELIX_BUNDLE_EXCEPTION;
}
- } while(semOpStatus == -1 && errno == EINTR);
+ } while (semOpStatus == -1 && errno == EINTR);
return status;
}
-
celix_status_t discovery_unlock(int semId, int semNr)
{
celix_status_t status = CELIX_SUCCESS;
int semOpStatus = 0;
struct sembuf semOperation;
- semOperation.sem_num=semNr;
- semOperation.sem_op=1;
- semOperation.sem_flg=0;
+ semOperation.sem_num = semNr;
+ semOperation.sem_op = 1;
+ semOperation.sem_flg = 0;
do
{
@@ -269,77 +252,336 @@ celix_status_t discovery_unlock(int semI
{
status = CELIX_BUNDLE_EXCEPTION;
}
- } while(semOpStatus == -1 && errno == EINTR);
+ } while (semOpStatus == -1 && errno == EINTR);
+
+ return status;
+}
+
+celix_status_t discovery_broadcast(int semId, int semNr)
+{
+ celix_status_t status = CELIX_SUCCESS;
+ int semOpStatus = 0;
+ struct sembuf semOperation;
+
+ semOperation.sem_num = semNr;
+ semOperation.sem_op = semctl(semId, semNr, GETNCNT, 0) + 1; /* + 1
cause we also want to include out own process */
+ semOperation.sem_flg = 0;
+
+ do
+ {
+ status = CELIX_SUCCESS;
+ if ((semOpStatus = semop(semId, &semOperation, 1)) != 0)
+ {
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ } while (semOpStatus == -1 && errno == EINTR);
return status;
}
+celix_status_t discovery_decShmMapService(discovery_pt discovery, char*
encServiceMap, hash_map_pt outShmMap)
+{
+ celix_status_t status = CELIX_SUCCESS;
+
+ if ((status = netstring_decodeToHashMap(discovery->pool, encServiceMap,
outShmMap)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_decShmMapService : decoding
service data to hashmap failed\n");
+ }
+ else
+ {
+ // decode service properties as well
+ char* encServiceProps = hashMap_get(outShmMap,
DISCOVERY_SHM_SRVC_PROPERTIES);
+ hash_map_pt props = hashMap_create(utils_stringHash,
utils_stringHash, utils_stringEquals, utils_stringEquals);
+
+ if ((status = netstring_decodeToHashMap(discovery->pool,
encServiceProps, props)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_decShmMapService :
Decoding of endpointProperties failed\n");
+ }
+
+ hashMap_put(outShmMap, DISCOVERY_SHM_SRVC_PROPERTIES, props);
+ }
+
+ return status;
+}
-celix_status_t discovery_wait(int semId, int semNr)
+celix_status_t discovery_encShmMapService(discovery_pt discovery, hash_map_pt
inShmMap, char** outEncServiceMap)
{
- celix_status_t status = CELIX_SUCCESS;
- int semOpStatus = 0;
- struct sembuf semOperation;
+ celix_status_t status = CELIX_SUCCESS;
- semOperation.sem_num = semNr;
- semOperation.sem_op = 0;
- semOperation.sem_flg = 0;
+ // encode service properties as well
+ char* encServiceProps = NULL;
+ hash_map_pt props = hashMap_get(inShmMap,
DISCOVERY_SHM_SRVC_PROPERTIES);
- do
- {
- status = CELIX_SUCCESS;
+ if ((status = netstring_encodeFromHashMap(discovery->pool, props,
&encServiceProps)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_encShmMapService : encoding of
endpointProperties failed\n");
+ }
+ else
+ {
+ hashMap_put(inShmMap, DISCOVERY_SHM_SRVC_PROPERTIES,
encServiceProps);
+
+ if ((status = netstring_encodeFromHashMap(discovery->pool,
inShmMap, outEncServiceMap)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_encShmMapService :
encoding service data to hashmap failed\n");
+ }
+ }
- if ((semOpStatus = semop(semId, &semOperation, 1)) != 0)
- {
- status = CELIX_BUNDLE_EXCEPTION;
- }
- } while (semOpStatus == -1 && errno == EINTR);
+ // we can only free that if not allocated via apr
+ hashMap_destroy(props, false, false);
- return status;
+ return status;
}
+celix_status_t discovery_decShmMapDiscoveryInstance(discovery_pt discovery,
char* encDiscInstance, hash_map_pt outRegServices)
+{
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ if ((status = netstring_decodeToHashMap(discovery->pool,
encDiscInstance, outRegServices)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_decShmMapDiscoveryInstance :
decoding data to properties failed\n");
+ }
+ else {
+
+ char* encServices = hashMap_get(outRegServices,
DISCOVERY_SHM_FW_SERVICES);
+ hash_map_pt fwServices = hashMap_create(utils_stringHash,
utils_stringHash, utils_stringEquals, utils_stringEquals);
+
+ if ((status = netstring_decodeToHashMap(discovery->pool,
encServices, fwServices)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_decShmMapDiscoveryInstance
: decoding services failed\n");
+ }
+ else
+ {
+ hash_map_iterator_pt shmItr =
hashMapIterator_create(fwServices);
+
+ while ((status == CELIX_SUCCESS) &&
(hashMapIterator_hasNext(shmItr) == true))
+ {
+ hash_map_pt regShmService =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+ hash_map_entry_pt shmSrvc =
hashMapIterator_nextEntry(shmItr);
+
+ char *serviceName =
hashMapEntry_getKey(shmSrvc);
+ char *encServiceMap =
hashMapEntry_getValue(shmSrvc);
+
+ if ((status =
discovery_decShmMapService(discovery, encServiceMap, regShmService)) ==
CELIX_SUCCESS)
+ {
+ hashMap_put(fwServices, serviceName,
regShmService);
+ }
+ }
+
+ hashMapIterator_destroy(shmItr);
+
+ hashMap_put(outRegServices, DISCOVERY_SHM_FW_SERVICES,
fwServices);
+ }
+ }
+ return status;
+}
-celix_status_t discovery_updateSHMServices(discovery_pt discovery, char
*serviceName, char *nsEncAttributes)
+// fwId -> hm_services
+celix_status_t discovery_encShmMapDiscoveryInstance(discovery_pt discovery,
hash_map_pt inFwAttr, char** outEncDiscoveryInstance)
{
celix_status_t status = CELIX_SUCCESS;
- if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL))
+ hash_map_pt inRegServices = hashMap_get(inFwAttr,
DISCOVERY_SHM_FW_SERVICES);
+ hash_map_iterator_pt shmItr = hashMapIterator_create(inRegServices);
+
+ while ((status == CELIX_SUCCESS) && (hashMapIterator_hasNext(shmItr) ==
true))
+ {
+ hash_map_entry_pt shmSrvc = hashMapIterator_nextEntry(shmItr);
+
+ char *encServiceMap = NULL;
+ char *serviceName = hashMapEntry_getKey(shmSrvc);
+
+ hash_map_pt regShmService = hashMapEntry_getValue(shmSrvc);
+
+ if ((status = discovery_encShmMapService(discovery,
regShmService, &encServiceMap)) == CELIX_SUCCESS)
+ {
+ hashMap_put(inRegServices, serviceName, encServiceMap);
+ }
+
+ hashMap_destroy(regShmService, false, false);
+ }
+
+ hashMapIterator_destroy(shmItr);
+
+ if (status == CELIX_SUCCESS)
+ {
+ char* outEncServices = NULL;
+
+ if ((status = netstring_encodeFromHashMap(discovery->pool,
inRegServices, &outEncServices)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY: discovery_encShmMapDiscoveryInstance
: encode services failed\n");
+ }
+ else
+ {
+ hashMap_put(inFwAttr, DISCOVERY_SHM_FW_SERVICES,
outEncServices);
+
+ if ((status =
netstring_encodeFromHashMap(discovery->pool, inFwAttr,
outEncDiscoveryInstance)) != CELIX_SUCCESS)
+ {
+ printf("DISCOVERY:
discovery_encShmMapDiscoveryInstance : encode discovery instances failed\n");
+ }
+
+ }
+ }
+
+ hashMap_destroy(inRegServices, false, false);
+
+ return status;
+}
+
+celix_status_t discovery_decShmMap(discovery_pt discovery, char* encMap,
hash_map_pt outRegDiscInstances)
+{
+ celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+ if ((status = netstring_decodeToHashMap(discovery->pool, encMap,
outRegDiscInstances)) != CELIX_SUCCESS)
{
+ printf("DISCOVERY: discovery_updateLocalSHMServices : decoding
data to properties failed\n");
+ }
+ else
+ {
+ hash_map_iterator_pt regDiscoveryInstancesItr =
hashMapIterator_create(outRegDiscInstances);
+
+ while ((status == CELIX_SUCCESS) &&
(hashMapIterator_hasNext(regDiscoveryInstancesItr) == true))
+ {
+ hash_map_pt regDiscoveryInstance =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+ hash_map_entry_pt regDiscoveryEntry =
hashMapIterator_nextEntry(regDiscoveryInstancesItr);
+
+ char* fwPid = hashMapEntry_getKey(regDiscoveryEntry);
+ char* encDiscoveryInstance =
hashMapEntry_getValue(regDiscoveryEntry);
+
+ if ((status =
discovery_decShmMapDiscoveryInstance(discovery, encDiscoveryInstance,
regDiscoveryInstance)) == CELIX_SUCCESS)
+ {
+ hashMap_put(outRegDiscInstances, fwPid,
regDiscoveryInstance);
+ }
+ }
+
+ hashMapIterator_destroy(regDiscoveryInstancesItr);
+
+ }
+
+ return status;
+}
+
+celix_status_t discovery_encShmMap(discovery_pt discovery, hash_map_pt
inRegDiscInstances, char** outEncMap)
+{
+ celix_status_t status = CELIX_SUCCESS;
+
+ hash_map_iterator_pt regDiscoveryInstancesItr =
hashMapIterator_create(inRegDiscInstances);
+
+ while ((status == CELIX_SUCCESS) &&
(hashMapIterator_hasNext(regDiscoveryInstancesItr) == true))
+ {
+ hash_map_entry_pt regDiscoveryEntry =
hashMapIterator_nextEntry(regDiscoveryInstancesItr);
+
+ char* encDiscoveryInstance = NULL;
+ char* fwPid = hashMapEntry_getKey(regDiscoveryEntry);
+ hash_map_pt regDiscoveryInstance =
hashMapEntry_getValue(regDiscoveryEntry);
+
+ if ((status = discovery_encShmMapDiscoveryInstance(discovery,
regDiscoveryInstance, &encDiscoveryInstance)) == CELIX_SUCCESS)
+ {
+ hashMap_put(inRegDiscInstances, fwPid,
encDiscoveryInstance);
+ }
+
+ hashMap_destroy(regDiscoveryInstance, false, false);
+ }
+
+ hashMapIterator_destroy(regDiscoveryInstancesItr);
+
+ if ((status == CELIX_SUCCESS) && ((status =
netstring_encodeFromHashMap(discovery->pool, inRegDiscInstances, outEncMap)) !=
CELIX_SUCCESS))
+ {
+ printf("DISCOVERY: discovery_encShmMapDiscoveryInstance :
encode shm map failed\n");
+ }
+
+ return status;
+}
+
+celix_status_t discovery_updateSHMServices(discovery_pt discovery,
endpoint_description_pt endpoint, bool addService)
+{
+ celix_status_t status = CELIX_SUCCESS;
+
+ if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL))
+ {
printf("DISCOVERY : shared memory not initialized.\n");
status = CELIX_BUNDLE_EXCEPTION;
}
else
{
- ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
+ ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
- if((status = discovery_lock(shmData->semId, 0)) ==
CELIX_SUCCESS)
+ if ((status = discovery_lock(shmData->semId, 0)) ==
CELIX_SUCCESS)
{
- hash_map_pt registeredShmServices =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+ hash_map_pt regDiscoveryInstances =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
- /* get already saved properties */
- if ((status =
netstring_decodeToHashMap(discovery->pool, shmData->data,
registeredShmServices)) != CELIX_SUCCESS)
+ if ((status = discovery_decShmMap(discovery,
&(shmData->data[0]), regDiscoveryInstances)) != CELIX_SUCCESS)
{
printf("DISCOVERY :
discovery_registerSHMService : decoding data to Properties failed\n");
}
else
{
- char *encShmServices = NULL;
+ char *uuid = NULL;
+ bundleContext_getProperty(discovery->context,
OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid);
+
+ hash_map_pt ownFramework =
hashMap_get(regDiscoveryInstances, uuid);
- if (nsEncAttributes != NULL)
+ if (ownFramework == NULL)
{
- hashMap_put(registeredShmServices,
serviceName, nsEncAttributes);
+ ownFramework =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+ hashMap_put(regDiscoveryInstances,
uuid, ownFramework);
+ }
+
+ hash_map_pt ownServices =
hashMap_get(ownFramework, DISCOVERY_SHM_FW_SERVICES);
+
+ if (ownServices == NULL)
+ {
+ ownServices =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+ hashMap_put(ownFramework,
DISCOVERY_SHM_FW_SERVICES, ownServices);
+ }
+
+ // check whether we want to add or remove a
service
+ if (addService == true)
+ {
+ // add service
+ hash_map_pt newService =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+
+ // we need to make a copy of the
properties
+ properties_pt endpointProperties =
properties_create();
+ hash_map_iterator_pt epItr =
hashMapIterator_create(endpoint->properties);
+
+ while (hashMapIterator_hasNext(epItr)
== true)
+ {
+ hash_map_entry_pt epEntry =
hashMapIterator_nextEntry(epItr);
+
properties_set(endpointProperties, (char*) hashMapEntry_getKey(epEntry),
(char*) hashMapEntry_getValue(epEntry));
+ }
+
+ hashMapIterator_destroy(epItr);
+
+ hashMap_put(newService,
DISCOVERY_SHM_SRVC_PROPERTIES, endpointProperties);
+ hashMap_put(ownServices,
endpoint->service, newService);
}
else
{
- hashMap_remove(registeredShmServices,
serviceName);
+ printf("remove Services %s\n",
endpoint->service);
+
+ hashMap_remove(ownServices,
endpoint->service);
+
+ // check if other services are
exported, otherwise remove framework/pid as well
+ // this is also important to ensure a
correct reference-counting (we assume that a discovery bundle crashed if we can
find
+ // the structure but the process with
the pid does not live anymore)
+ if (hashMap_size(ownServices) == 0)
+ {
+ printf("removing framework w/
uuid %s\n", uuid);
+
+ hashMap_remove(ownFramework,
DISCOVERY_SHM_FW_SERVICES);
+
hashMap_remove(regDiscoveryInstances, uuid);
+ }
+
}
- // write back
- if ((status =
netstring_encodeFromHashMap(discovery->pool, registeredShmServices,
&encShmServices)) == CELIX_SUCCESS)
+ // write back to shm
+ char* encShmMemStr = NULL;
+
+ if ((status = discovery_encShmMap(discovery,
regDiscoveryInstances, &encShmMemStr)) == CELIX_SUCCESS)
{
- strcpy(shmData->data, encShmServices);
+ strcpy(&(shmData->data[0]),
encShmMemStr);
}
else
{
@@ -347,35 +589,25 @@ celix_status_t discovery_updateSHMServic
}
}
- hashMap_destroy(registeredShmServices, false, false);
- discovery_unlock(shmData->semId, 0);
+ hashMap_destroy(regDiscoveryInstances, false, false);
- /* unlock and afterwards lock to inform all listener */
- discovery_unlock(shmData->semId, 1);
- // wait till notify semaphore is 0 to ensure all
threads have performed update routine
- discovery_wait(shmData->semId, 2);
- discovery_lock(shmData->semId, 1);
+ discovery_unlock(shmData->semId, 0);
+ discovery_broadcast(shmData->semId, 1);
}
}
return status;
}
-
-celix_status_t discovery_registerSHMService(discovery_pt discovery, char
*serviceName, char *nsEncAttributes)
+celix_status_t discovery_registerSHMService(discovery_pt discovery,
endpoint_description_pt endpoint)
{
- return discovery_updateSHMServices(discovery, serviceName,
nsEncAttributes);
+ return discovery_updateSHMServices(discovery, endpoint, true);
}
-
-
-celix_status_t discovery_deregisterSHMService(discovery_pt discovery, char
*serviceName)
+celix_status_t discovery_deregisterSHMService(discovery_pt discovery,
endpoint_description_pt endpoint)
{
- return discovery_updateSHMServices(discovery, serviceName, NULL);
+ return discovery_updateSHMServices(discovery, endpoint, false);
}
-
-
-
celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt
endpoint, char *machtedFilter) {
celix_status_t status = CELIX_SUCCESS;
printf("DISCOVERY: Endpoint for %s, with filter \"%s\" added\n",
endpoint->service, machtedFilter);
@@ -383,31 +615,25 @@ celix_status_t discovery_endpointAdded(v
if (status == CELIX_SUCCESS) {
- char *nsEncAttribute;
-
- netstring_encodeFromHashMap(discovery->pool, (hash_map_pt)
endpoint->properties, &nsEncAttribute);
-
- if ((status = discovery_registerSHMService(discovery,
endpoint->service, nsEncAttribute)) != CELIX_SUCCESS)
+ if ((status = discovery_registerSHMService(discovery,
endpoint)) != CELIX_SUCCESS)
{
printf("DISCOVERY: Error registering service (%s)
within shared memory \n", endpoint->service);
}
- arrayList_add(discovery->registered, strdup(endpoint->service));
+ arrayList_add(discovery->registered, endpoint);
}
return status;
}
-
celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt
endpoint, char *machtedFilter) {
celix_status_t status = CELIX_SUCCESS;
printf("DISCOVERY: Endpoint for %s, with filter \"%s\" removed\n",
endpoint->service, machtedFilter);
discovery_pt discovery = handle;
- char *serviceUrl = NULL;
if (status == CELIX_SUCCESS) {
- status = discovery_deregisterSHMService(discovery,
endpoint->service);
+ status = discovery_deregisterSHMService(discovery, endpoint);
int i;
for (i = 0; i < arrayList_size(discovery->registered); i++) {
char *url = arrayList_get(discovery->registered, i);
@@ -472,14 +698,22 @@ celix_status_t discovery_updateEndpointL
arrayList_add(scopes, scope);
}
- hash_map_iterator_pt iter =
hashMapIterator_create(discovery->shmServices);
+ hash_map_iterator_pt fwIter =
hashMapIterator_create(discovery->shmServices);
- while (hashMapIterator_hasNext(iter)) {
- hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
- char *key = hashMapEntry_getKey(entry);
- endpoint_description_pt value = hashMapEntry_getValue(entry);
- discovery_informListener(discovery, service, value);
+ while (hashMapIterator_hasNext(fwIter)) {
+ hash_map_entry_pt entry = hashMapIterator_nextEntry(fwIter);
+ hash_map_pt fwServices = hashMapEntry_getValue(entry);
+
+ hash_map_iterator_pt fwServicesIter =
hashMapIterator_create(fwServices);
+
+ while (hashMapIterator_hasNext(fwServicesIter))
+ {
+ endpoint_description_pt value =
(endpoint_description_pt) hashMapIterator_nextValue(fwServicesIter);
+ discovery_informListener(discovery, service, value);
+ }
+ hashMapIterator_destroy(fwServicesIter);
}
+ hashMapIterator_destroy(fwIter);
return status;
}
@@ -494,48 +728,47 @@ celix_status_t discovery_endpointListene
return status;
}
-
celix_status_t discovery_createOrAttachShm(discovery_pt discovery)
{
celix_status_t status = CELIX_SUCCESS;
- key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID);
+ key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID);
if ((discovery->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, 0666)) <
0)
- {
- printf("DISCOVERY : Could not attach to shared memory. Trying
to create shared memory segment. \n");
+ {
+ printf("DISCOVERY : Could not attach to shared memory. Trying
to create shared memory segment. \n");
- // trying to create shared memory
- if ((discovery->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE,
IPC_CREAT | 0666)) < 0)
- {
- printf("DISCOVERY : Creation of shared memory segment
failed\n");
- status = CELIX_BUNDLE_EXCEPTION;
- }
- else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0,
0)) == (char*) -1 )
- {
- printf("DISCOVERY : Attaching of shared memory segment
failed\n");
- status = CELIX_BUNDLE_EXCEPTION;
- }
- else
- {
- int semId = -1;
- key_t semKey = -1;
- ipc_shmData_pt shmData = NULL;
- printf("DISCOVERY : Shared memory segment successfully
created at %p\n", discovery->shmBaseAdress);
-
- // create structure
- shmData = apr_palloc(discovery->pool, sizeof(*shmData));
- semKey = ftok(DISCOVERY_SEM_FILENAME,
DISCOVERY_SEM_FTOK_ID);
+ // trying to create shared memory
+ if ((discovery->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE,
IPC_CREAT | 0666)) < 0)
+ {
+ printf("DISCOVERY : Creation of shared memory segment
failed\n");
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0,
0)) == (char*) -1)
+ {
+ printf("DISCOVERY : Attaching of shared memory segment
failed\n");
+ status = CELIX_BUNDLE_EXCEPTION;
+ }
+ else
+ {
+ int semId = -1;
+ key_t semKey = -1;
+ ipc_shmData_pt shmData = NULL;
+ printf("DISCOVERY : Shared memory segment successfully
created at %p\n", discovery->shmBaseAdress);
+
+ // create structure
+ shmData = calloc(1, sizeof(*shmData));
+ semKey = ftok(DISCOVERY_SEM_FILENAME,
DISCOVERY_SEM_FTOK_ID);
- if ((semId = semget(semKey, 3, 0666 | IPC_CREAT)) == -1)
- {
+ if ((semId = semget(semKey, 2, 0666 | IPC_CREAT)) == -1)
+ {
printf("DISCOVERY : Creation of semaphores
failed %i\n", semId);
}
else
{
// set
- if ( semctl (semId, 0, SETVAL, (int) 1) < 0)
- {
+ if (semctl(semId, 0, SETVAL, (int) 1) < 0)
+ {
printf(" DISCOVERY : error while
initializing semaphore 0 \n");
}
else
@@ -543,8 +776,8 @@ celix_status_t discovery_createOrAttachS
printf(" DISCOVERY : semaphore 0
initialized w/ %d\n", semctl(semId, 0, GETVAL, 0));
}
- if ( semctl (semId, 1, SETVAL, (int) 0) < 0)
- {
+ if (semctl(semId, 1, SETVAL, (int) 0) < 0)
+ {
printf(" DISCOVERY : error while
initializing semaphore 1\n");
}
else
@@ -552,31 +785,27 @@ celix_status_t discovery_createOrAttachS
printf(" DISCOVERY : semaphore 1
initialized w/ %d\n", semctl(semId, 1, GETVAL, 0));
}
- if ( semctl (semId, 2, SETVAL, (int) 0) < 0)
- {
- printf(" DISCOVERY : error while
initializing semaphore 2\n");
- }
- else
- {
- printf(" DISCOVERY : semaphore 2
initialized w/ %d\n", semctl(semId, 2, GETVAL, 0));
- }
-
shmData->semId = semId;
+ shmData->numListeners = 1;
+ printf(" numListeners is initalized: %d \n",
shmData->numListeners);
+
memcpy(discovery->shmBaseAdress, shmData,
sizeof(*shmData));
}
- }
+
+ free(shmData);
+ }
+ }
+ else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0, 0)) < 0)
+ {
+ printf("DISCOVERY : Attaching of shared memory segment
failed\n");
+ status = CELIX_BUNDLE_EXCEPTION;
}
- else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0, 0)) < 0)
- {
- printf("DISCOVERY : Attaching of shared memory segment
failed\n");
- status = CELIX_BUNDLE_EXCEPTION;
- }
- else
+ else
{
- ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
+ ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
discovery_lock(shmData->semId, 0);
- shmData->numListeners++;
+ shmData->numListeners++;
discovery_unlock(shmData->semId, 0);
discovery_updateLocalSHMServices(discovery);
}
@@ -584,28 +813,27 @@ celix_status_t discovery_createOrAttachS
return status;
}
-
-
celix_status_t discovery_stopOrDetachShm(discovery_pt discovery)
{
celix_status_t status = CELIX_SUCCESS;
- if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL))
- {
+ if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL))
+ {
printf("DISCOVERY : discovery_addNewEntry : shared memory not
initialized.\n");
status = CELIX_BUNDLE_EXCEPTION;
}
else
{
int listener = 0;
- ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
+ ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
discovery_lock(shmData->semId, 0);
- listener = shmData->numListeners--;
+ shmData->numListeners--;
+ printf(" numListeners decreased: %d \n", shmData->numListeners);
discovery_unlock(shmData->semId, 0);
- if (listener > 0)
- {
+ if (shmData->numListeners > 0)
+ {
printf("DISCOVERY: Detaching from Shared memory\n");
shmdt(discovery->shmBaseAdress);
}
@@ -624,124 +852,162 @@ celix_status_t discovery_stopOrDetachShm
return status;
}
-
celix_status_t discovery_updateLocalSHMServices(discovery_pt discovery)
{
celix_status_t status = CELIX_SUCCESS;
ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress;
- if((status = discovery_lock(shmData->semId, 0)) != CELIX_SUCCESS)
+ if ((status = discovery_lock(shmData->semId, 0)) != CELIX_SUCCESS)
{
- printf("DISCOVERY : discovery_updateLocalSHMServices : cannot
acquire semaphore\n");
+ printf("cannot acquire semaphore");
}
else
{
- hash_map_pt registeredShmServices =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
+ hash_map_pt regDiscoveryInstances =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
- if ((status = netstring_decodeToHashMap(discovery->pool,
&(shmData->data[0]), registeredShmServices)) != CELIX_SUCCESS)
- {
- printf("DISCOVERY : discovery_updateLocalSHMServices :
decoding data to properties failed\n");
- }
- else
+ if ((status = discovery_decShmMap(discovery,
&(shmData->data[0]), regDiscoveryInstances)) == CELIX_SUCCESS)
{
- /* check for new services */
- hash_map_iterator_pt shmPrpItr =
hashMapIterator_create(registeredShmServices);
+ hash_map_iterator_pt regDiscoveryInstancesItr =
hashMapIterator_create(regDiscoveryInstances);
- while(hashMapIterator_hasNext(shmPrpItr) == true)
+ while
(hashMapIterator_hasNext(regDiscoveryInstancesItr) == true)
{
- hash_map_entry_pt shmPrpEntry =
hashMapIterator_nextEntry(shmPrpItr);
- char *serviceName =
hashMapEntry_getKey(shmPrpEntry);
+ hash_map_entry_pt regDiscoveryEntry =
hashMapIterator_nextEntry(regDiscoveryInstancesItr);
- if(hashMap_get(discovery->shmServices,
serviceName) != NULL)
- {
- printf("DISCOVERY :
discovery_updateLocalSHMServices : service with url %s already registered\n",
serviceName );
- }
- else
+ char* uuid =
hashMapEntry_getKey(regDiscoveryEntry);
+ hash_map_pt fwAttr =
hashMapEntry_getValue(regDiscoveryEntry);
+ hash_map_pt services = hashMap_get(fwAttr,
DISCOVERY_SHM_FW_SERVICES);
+
+ /* check for new services */
+ hash_map_iterator_pt srvcItr =
hashMapIterator_create(services);
+
+ while (hashMapIterator_hasNext(srvcItr) == true)
{
- hash_map_pt props =
hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals,
utils_stringEquals);
- char *nsEncEndpointProp =
hashMapEntry_getValue(shmPrpEntry);
+ hash_map_entry_pt srvc =
hashMapIterator_nextEntry(srvcItr);
+
+ char *srvcName =
hashMapEntry_getKey(srvc);
+ hash_map_pt srvcAttr =
hashMapEntry_getValue(srvc);
+ hash_map_pt fwServices = NULL;
+
+ // check whether we have a service from
that fw at all
+ if ((fwServices =
hashMap_get(discovery->shmServices, uuid)) == NULL)
+ {
+ fwServices =
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
hashMap_put(discovery->shmServices, uuid, fwServices);
+ }
- if ( (status =
netstring_decodeToHashMap(discovery->pool, nsEncEndpointProp, props)) !=
CELIX_SUCCESS)
+ if (hashMap_get(fwServices, srvcName)
!= NULL)
{
- printf("DISCOVERY :
discovery_updateLocalSHMServices : Decoding of endpointProperties failed\n");
+ printf("DISCOVERY :
discovery_updateLocalSHMServices : service with url %s from %s already
registered", srvcName, uuid);
}
else
{
endpoint_description_pt
endpoint = apr_palloc(discovery->pool, sizeof(*endpoint));
- endpoint->id =
apr_pstrdup(discovery->pool, serviceName);
+ endpoint->id =
apr_pstrdup(discovery->pool, srvcName);
endpoint->serviceId = 42;
- endpoint->service =
apr_pstrdup(discovery->pool, serviceName);
- endpoint->properties =
(properties_pt) props;
+ endpoint->service =
apr_pstrdup(discovery->pool, srvcName);
+ endpoint->properties =
(properties_pt) hashMap_get(srvcAttr, DISCOVERY_SHM_SRVC_PROPERTIES);
+ endpoint->frameworkUUID = uuid;
discovery_addService(discovery,
endpoint);
-
hashMap_put(discovery->shmServices, apr_pstrdup(discovery->pool, serviceName),
endpoint);
+ hashMap_put(fwServices,
srvcName, endpoint);
}
}
- }
- hashMapIterator_destroy(shmPrpItr);
+ hashMapIterator_destroy(srvcItr);
+
+ /* check for obsolete services for this uuid */
+ hash_map_pt fwServices =
hashMap_get(discovery->shmServices, uuid);
+ hash_map_iterator_pt shmServicesItr =
hashMapIterator_create(fwServices);
- /* check for obsolete services */
- hash_map_iterator_pt shmServicesItr =
hashMapIterator_create(discovery->shmServices);
+ // iterate over frameworks
+ while (hashMapIterator_hasNext(shmServicesItr)
== true)
+ {
+ hash_map_entry_pt shmService =
hashMapIterator_nextEntry(shmServicesItr);
+ char *fwurl =
hashMapEntry_getKey(shmService);
- while(hashMapIterator_hasNext(shmServicesItr) == true)
+ if (hashMap_get(services, fwurl) ==
NULL)
+ {
+ printf("DISCOVERY: service with
url %s from %s already unregistered", fwurl, uuid);
+ endpoint_description_pt
endpoint = hashMap_get(fwServices, fwurl);
+
discovery_removeService(discovery, endpoint);
+ hashMap_remove(fwServices,
fwurl);
+ }
+ }
+ hashMapIterator_destroy(shmServicesItr);
+ }
+ hashMapIterator_destroy(regDiscoveryInstancesItr);
+
+ /* check for obsolete frameworks*/
+ hash_map_iterator_pt lclFwItr =
hashMapIterator_create(discovery->shmServices);
+
+ // iterate over frameworks
+ while (hashMapIterator_hasNext(lclFwItr) == true)
{
- hash_map_entry_pt shmService =
hashMapIterator_nextEntry(shmServicesItr);
- char *url = hashMapEntry_getKey(shmService);
+ hash_map_entry_pt lclFwEntry =
hashMapIterator_nextEntry(lclFwItr);
+ char *fwUUID = hashMapEntry_getKey(lclFwEntry);
- if(hashMap_get(registeredShmServices, url) ==
NULL)
+ // whole framework not available any more
+ if (hashMap_get(regDiscoveryInstances, fwUUID)
== NULL)
{
- printf("DISCOVERY :
discovery_updateLocalSHMServices : service with url %s unregistered\n", url);
- endpoint_description_pt endpoint =
hashMap_get(discovery->shmServices, url);
- discovery_removeService(discovery,
endpoint);
- hashMap_remove(discovery->shmServices,
url);
+ hash_map_pt lclFwServices = NULL;
+
+ if ((lclFwServices = (hash_map_pt)
hashMapEntry_getValue(lclFwEntry)) == NULL)
+ {
+ printf("UUID %s does not have
any services, but a structure\n", fwUUID);
+ }
+ else
+ {
+ // remove them all
+ hash_map_iterator_pt
lclFwServicesItr = hashMapIterator_create(lclFwServices);
+
+ while
(hashMapIterator_hasNext(lclFwServicesItr) == true)
+ {
+ hash_map_entry_pt
lclFwSrvcEntry = hashMapIterator_nextEntry(lclFwServicesItr);
+
+
discovery_removeService(discovery, (endpoint_description_pt)
hashMapEntry_getValue(lclFwSrvcEntry));
+
hashMapIterator_remove(lclFwServicesItr);
+ }
+
+
hashMapIterator_destroy(lclFwServicesItr);
+ }
}
}
- hashMapIterator_destroy(shmServicesItr);
- }
+ hashMapIterator_destroy(lclFwItr);
+
+ }
+ hashMap_destroy(regDiscoveryInstances, false, false);
discovery_unlock(shmData->semId, 0);
- hashMap_destroy(registeredShmServices, false, false);
}
+
return status;
}
-
static void *APR_THREAD_FUNC discovery_pollSHMServices(apr_thread_t *thd, void
*data)
{
discovery_pt discovery = data;
celix_status_t status = CELIX_SUCCESS;
- if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL))
+ if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL))
{
- printf("DISCOVERY : discovery_pollSHMServices : shared memory
not initialized.\n");
+ printf( "DISCOVERY: shared memory not initialized.");
status = CELIX_BUNDLE_EXCEPTION;
}
else
{
- ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
-
- printf("DISCOVERY : discovery_pollSHMServices thread
started\n");
+ ipc_shmData_pt shmData = (ipc_shmData_pt)
discovery->shmBaseAdress;
while(discovery->running == true)
{
- if(((status = discovery_unlock(shmData->semId, 2)) !=
CELIX_SUCCESS) && (discovery->running == true))
- {
- printf("DISCOVERY : discovery_pollSHMServices :
cannot acquire semaphore\n");
- }
- else if(((status = discovery_lock(shmData->semId, 1))
!= CELIX_SUCCESS) && (discovery->running == true))
+ if(((status = discovery_lock(shmData->semId, 1)) !=
CELIX_SUCCESS) && (discovery->running == true))
{
- printf("DISCOVERY : discovery_pollSHMServices :
cannot acquire semaphore\n");
+ printf( "DISCOVERY: cannot acquire semaphore.
Breaking main poll cycle.");
+ break;
}
else
{
discovery_updateLocalSHMServices(discovery);
-
- discovery_lock(shmData->semId, 2);
- discovery_unlock(shmData->semId, 1);
}
-
- sleep(1);
}
}
@@ -749,4 +1015,3 @@ static void *APR_THREAD_FUNC discovery_p
return NULL;
}
-