http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/framework/src/celix_launcher.c ---------------------------------------------------------------------- diff --cc framework/src/celix_launcher.c index ba83f25,0000000..fe5d0c0 mode 100644,000000..100644 --- a/framework/src/celix_launcher.c +++ b/framework/src/celix_launcher.c @@@ -1,242 -1,0 +1,315 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * celix_launcher.c + * + * \date Mar 23, 2010 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include "celix_launcher.h" + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <libgen.h> +#include <signal.h> + +#ifndef CELIX_NO_CURLINIT +#include <curl/curl.h> +#endif + +#include <string.h> +#include <curl/curl.h> +#include <signal.h> +#include <libgen.h> +#include "celix_launcher.h" +#include "framework.h" +#include "linked_list_iterator.h" + +static void show_usage(char* prog_name); +static void shutdown_framework(int signal); +static void ignore(int signal); + ++static int celixLauncher_launchWithConfigAndProps(const char *configFile, framework_pt *framework, properties_pt packedConfig); ++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt *framework, properties_pt packedConfig); ++ +#define DEFAULT_CONFIG_FILE "config.properties" + +static framework_pt framework = NULL; + ++/** ++ * Method kept because of usage in examples & unit tests ++ */ +int celixLauncher_launchWithArgs(int argc, char *argv[]) { ++ return celixLauncher_launchWithArgsAndProps(argc, argv, NULL); ++} ++ ++int celixLauncher_launchWithArgsAndProps(int argc, char *argv[], properties_pt packedConfig) { + // Perform some minimal command-line option parsing... + char *opt = NULL; + if (argc > 1) { + opt = argv[1]; + } + + char *config_file = NULL; + + if (opt) { + // Check whether the user wants some help... + if (strcmp("-h", opt) == 0 || strcmp("-help", opt) == 0) { + show_usage(argv[0]); + return 0; + } else { + config_file = opt; + } + } else { + config_file = DEFAULT_CONFIG_FILE; + } + + struct sigaction sigact; + memset(&sigact, 0, sizeof(sigact)); + sigact.sa_handler = shutdown_framework; + sigaction(SIGINT, &sigact, NULL); + sigaction(SIGTERM, &sigact, NULL); + + memset(&sigact, 0, sizeof(sigact)); + sigact.sa_handler = ignore; + sigaction(SIGUSR1, &sigact, NULL); + sigaction(SIGUSR2, &sigact, NULL); + - int rc = celixLauncher_launch(config_file, &framework); ++ int rc = celixLauncher_launchWithConfigAndProps(config_file, &framework, packedConfig); + if (rc == 0) { + celixLauncher_waitForShutdown(framework); + celixLauncher_destroy(framework); + } + return rc; +} + +static void show_usage(char* prog_name) { + printf("Usage:\n %s [path/to/config.properties]\n\n", basename(prog_name)); +} + +static void shutdown_framework(int signal) { + if (framework != NULL) { + celixLauncher_stop(framework); //NOTE main thread will destroy + } +} + +static void ignore(int signal) { + //ignoring for signal SIGUSR1, SIGUSR2. Can be used on threads +} + +int celixLauncher_launch(const char *configFile, framework_pt *framework) { ++ return celixLauncher_launchWithConfigAndProps(configFile, framework, NULL); ++} ++ ++static int celixLauncher_launchWithConfigAndProps(const char *configFile, framework_pt *framework, properties_pt packedConfig){ + int status = 0; + FILE *config = fopen(configFile, "r"); - if (config != NULL) { ++ ++ if (config != NULL && packedConfig != NULL) { ++ status = celixLauncher_launchWithStreamAndProps(config, framework, packedConfig); ++ } else if (config != NULL) { + status = celixLauncher_launchWithStream(config, framework); ++ } else if (packedConfig != NULL) { ++ status = celixLauncher_launchWithProperties(packedConfig, framework); + } else { + fprintf(stderr, "Error: invalid or non-existing configuration file: '%s'.", configFile); + perror(""); + status = 1; + } ++ + return status; +} + +int celixLauncher_launchWithStream(FILE *stream, framework_pt *framework) { + int status = 0; + + properties_pt config = properties_loadWithStream(stream); + fclose(stream); + // Make sure we've read it and that nothing went wrong with the file access... + if (config == NULL) { + fprintf(stderr, "Error: invalid configuration file"); + perror(NULL); + status = 1; + } + else { + status = celixLauncher_launchWithProperties(config, framework); + } + + return status; +} + ++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt *framework, properties_pt packedConfig){ ++ int status = 0; ++ ++ properties_pt runtimeConfig = properties_loadWithStream(stream); ++ fclose(stream); ++ ++ // Make sure we've read it and that nothing went wrong with the file access... ++ // If there is no runtimeConfig, the packedConfig can be stored as global config ++ if (runtimeConfig == NULL){ ++ runtimeConfig = packedConfig; ++ } ++ ++ if (runtimeConfig == NULL) { ++ fprintf(stderr, "Error: invalid configuration file"); ++ perror(NULL); ++ status = 1; ++ } else { ++ // Check if there's a pre-compiled config available ++ if (packedConfig != NULL){ ++ // runtimeConfig and packedConfig must be merged ++ // when a duplicate of a key is available, the runtimeConfig must be prioritized ++ ++ hash_map_iterator_t iter = hashMapIterator_construct(packedConfig); ++ ++ hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); ++ ++ while (entry != NULL) { ++ const char * key = (const char *) hashMapEntry_getKey(entry); ++ const char * value = (const char *) hashMapEntry_getValue(entry); ++ ++ // Check existence of key in runtimeConfig ++ if (!hashMap_containsKey(runtimeConfig, key)) { ++ properties_set(runtimeConfig, key, value); ++ } ++ ++ entry = hashMapIterator_nextEntry(&iter); ++ if (entry != NULL) { ++ key = (const char *) hashMapEntry_getKey(entry); ++ value = (const char *) hashMapEntry_getValue(entry); ++ } ++ } ++ ++ // normally, the framework_destroy will clean up the properties_pt ++ // since there are 2 properties_pt available (runtimeConfig and packedConfig) ++ // the packedConfig must be destroyed ++ properties_destroy(packedConfig); ++ } ++ ++ status = celixLauncher_launchWithProperties(runtimeConfig, framework); ++ } ++ ++ return status; ++} + +int celixLauncher_launchWithProperties(properties_pt config, framework_pt *framework) { + celix_status_t status; +#ifndef CELIX_NO_CURLINIT + // Before doing anything else, let's setup Curl + curl_global_init(CURL_GLOBAL_NOTHING); +#endif + + const char* autoStartProp = properties_get(config, "cosgi.auto.start.1"); + char* autoStart = NULL; + if (autoStartProp != NULL) { + autoStart = strndup(autoStartProp, 1024*10); + } + + status = framework_create(framework, config); + bundle_pt fwBundle = NULL; + if (status == CELIX_SUCCESS) { + status = fw_init(*framework); + if (status == CELIX_SUCCESS) { + // Start the system bundle + status = framework_getFrameworkBundle(*framework, &fwBundle); + + if(status == CELIX_SUCCESS){ + bundle_start(fwBundle); + + char delims[] = " "; + char *result = NULL; + char *save_ptr = NULL; + linked_list_pt bundles; + array_list_pt installed = NULL; + bundle_context_pt context = NULL; + linked_list_iterator_pt iter = NULL; + unsigned int i; + + linkedList_create(&bundles); + result = strtok_r(autoStart, delims, &save_ptr); + while (result != NULL) { + char *location = strdup(result); + linkedList_addElement(bundles, location); + result = strtok_r(NULL, delims, &save_ptr); + } + // First install all bundles + // Afterwards start them + arrayList_create(&installed); + bundle_getContext(fwBundle, &context); + iter = linkedListIterator_create(bundles, 0); + while (linkedListIterator_hasNext(iter)) { + bundle_pt current = NULL; + char *location = (char *) linkedListIterator_next(iter); + if (bundleContext_installBundle(context, location, ¤t) == CELIX_SUCCESS) { + // Only add bundle if it is installed correctly + arrayList_add(installed, current); + } else { + printf("Could not install bundle from %s\n", location); + } + linkedListIterator_remove(iter); + free(location); + } + linkedListIterator_destroy(iter); + linkedList_destroy(bundles); + + for (i = 0; i < arrayList_size(installed); i++) { + bundle_pt installedBundle = (bundle_pt) arrayList_get(installed, i); + bundle_startWithOptions(installedBundle, 0); + } + + arrayList_destroy(installed); + } + } + } + + if (status != CELIX_SUCCESS) { + printf("Problem creating framework\n"); + } + + printf("Launcher: Framework Started\n"); + + free(autoStart); + + return status; +} + +void celixLauncher_waitForShutdown(framework_pt framework) { + framework_waitForStop(framework); +} + +void celixLauncher_destroy(framework_pt framework) { + framework_destroy(framework); + +#ifndef CELIX_NO_CURLINIT + // Cleanup Curl + curl_global_cleanup(); +#endif + + printf("Launcher: Exit\n"); +} + +void celixLauncher_stop(framework_pt framework) { + bundle_pt fwBundle = NULL; + if( framework_getFrameworkBundle(framework, &fwBundle) == CELIX_SUCCESS){ + bundle_stop(fwBundle); + } +}
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/pubsub/pubsub_admin_udp_mc/src/topic_publication.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_admin_udp_mc/src/topic_publication.c index e43ec29,0000000..44106df mode 100644,000000..100644 --- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c +++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c @@@ -1,444 -1,0 +1,437 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * - * htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0 ++ * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ - /* - * topic_publication.c - * - * \date Sep 24, 2015 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ + +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <errno.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "array_list.h" +#include "celixbool.h" +#include "service_registration.h" +#include "utils.h" +#include "service_factory.h" +#include "version.h" + +#include "topic_publication.h" +#include "pubsub_common.h" +#include "publisher.h" +#include "large_udp.h" + +#include "pubsub_serializer.h" + +#define EP_ADDRESS_LEN 32 + +#define FIRST_SEND_DELAY 2 + +struct topic_publication { + int sendSocket; + char* endpoint; + service_registration_pt svcFactoryReg; + array_list_pt pub_ep_list; //List<pubsub_endpoint> + hash_map_pt boundServices; //<bundle_pt,bound_service> + celix_thread_mutex_t tp_lock; + pubsub_serializer_service_t *serializer; + struct sockaddr_in destAddr; +}; + +typedef struct publish_bundle_bound_service { + topic_publication_pt parent; + pubsub_publisher_t service; + bundle_pt bundle; + char *scope; + char *topic; + hash_map_pt msgTypes; + unsigned short getCount; + celix_thread_mutex_t mp_lock; + largeUdp_pt largeUdpHandle; +}* publish_bundle_bound_service_pt; + + +typedef struct pubsub_msg{ + pubsub_msg_header_pt header; + char* payload; + unsigned int payloadSize; +} pubsub_msg_t; + + +static unsigned int rand_range(unsigned int min, unsigned int max); + +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); + +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); + +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); + +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); + + +static void delay_first_send_for_late_joiners(void); + + +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){ + + char* ep = malloc(EP_ADDRESS_LEN); + memset(ep,0,EP_ADDRESS_LEN); + unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT); + snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port); + + + topic_publication_pt pub = calloc(1,sizeof(*pub)); + + arrayList_create(&(pub->pub_ep_list)); + pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL); + celixThreadMutex_create(&(pub->tp_lock),NULL); + + pub->endpoint = ep; + pub->sendSocket = sendSocket; + pub->destAddr.sin_family = AF_INET; + pub->destAddr.sin_addr.s_addr = inet_addr(bindIP); + pub->destAddr.sin_port = htons(port); + + pub->serializer = best_serializer; + + pubsub_topicPublicationAddPublisherEP(pub,pubEP); + + *out = pub; + + return CELIX_SUCCESS; +} + +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&(pub->tp_lock)); + + free(pub->endpoint); + arrayList_destroy(pub->pub_ep_list); + + hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); + while(hashMapIterator_hasNext(iter)){ + publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); + pubsub_destroyPublishBundleBoundService(bound); + } + hashMapIterator_destroy(iter); + hashMap_destroy(pub->boundServices,false,false); + + pub->svcFactoryReg = NULL; + pub->serializer = NULL; + + if(close(pub->sendSocket) != 0){ + status = CELIX_FILE_IO_EXCEPTION; + } + + celixThreadMutex_unlock(&(pub->tp_lock)); + + celixThreadMutex_destroy(&(pub->tp_lock)); + + free(pub); + + return status; +} + +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){ + celix_status_t status = CELIX_SUCCESS; + + /* Let's register the new service */ + + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); + + if(pubEP!=NULL){ + service_factory_pt factory = calloc(1, sizeof(*factory)); + factory->handle = pub; + factory->getService = pubsub_topicPublicationGetService; + factory->ungetService = pubsub_topicPublicationUngetService; + + properties_pt props = properties_create(); + properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); + properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); + + status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); + + if(status != CELIX_SUCCESS){ + properties_destroy(props); + printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID); + } + else{ + *svcFactory = factory; + } + } + else{ + printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); + status = CELIX_SERVICE_EXCEPTION; + } + + return status; +} + +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ + return serviceRegistration_unregister(pub->svcFactoryReg); +} + +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ + + celixThreadMutex_lock(&(pub->tp_lock)); + ep->endpoint = strdup(pub->endpoint); + arrayList_add(pub->pub_ep_list,ep); + celixThreadMutex_unlock(&(pub->tp_lock)); + + return CELIX_SUCCESS; +} + +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ + + celixThreadMutex_lock(&(pub->tp_lock)); + arrayList_removeElement(pub->pub_ep_list,ep); + celixThreadMutex_unlock(&(pub->tp_lock)); + + return CELIX_SUCCESS; +} + +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ + array_list_pt list = NULL; + celixThreadMutex_lock(&(pub->tp_lock)); + list = arrayList_clone(pub->pub_ep_list); + celixThreadMutex_unlock(&(pub->tp_lock)); + return list; +} + + +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { + celix_status_t status = CELIX_SUCCESS; + + topic_publication_pt publish = (topic_publication_pt)handle; + + celixThreadMutex_lock(&(publish->tp_lock)); + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound==NULL){ + bound = pubsub_createPublishBundleBoundService(publish,bundle); + if(bound!=NULL){ + hashMap_put(publish->boundServices,bundle,bound); + } + } + else{ + bound->getCount++; + } + + if (bound != NULL) { + *service = &bound->service; + } + + celixThreadMutex_unlock(&(publish->tp_lock)); + + return status; +} + +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { + + topic_publication_pt publish = (topic_publication_pt)handle; + + celixThreadMutex_lock(&(publish->tp_lock)); + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound!=NULL){ + + bound->getCount--; + if(bound->getCount==0){ + pubsub_destroyPublishBundleBoundService(bound); + hashMap_remove(publish->boundServices,bundle); + } + + } + else{ + long bundleId = -1; + bundle_getBundleId(bundle,&bundleId); + printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId); + } + + /* service should be never used for unget, so let's set the pointer to NULL */ + *service = NULL; + + celixThreadMutex_unlock(&(publish->tp_lock)); + + return CELIX_SUCCESS; +} + +static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){ + const int iovec_len = 3; // header + size + payload + bool ret = true; + + struct iovec msg_iovec[iovec_len]; + msg_iovec[0].iov_base = msg->header; + msg_iovec[0].iov_len = sizeof(*msg->header); + msg_iovec[1].iov_base = &msg->payloadSize; + msg_iovec[1].iov_len = sizeof(msg->payloadSize); + msg_iovec[2].iov_base = msg->payload; + msg_iovec[2].iov_len = msg->payloadSize; + + delay_first_send_for_late_joiners(); + + if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) { + perror("send_pubsub_msg:sendSocket"); + ret = false; + } + + if(releaseCallback) { + releaseCallback->release(msg->payload, bound); + } + return ret; + +} + + +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) { + int status = 0; + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; + + celixThreadMutex_lock(&(bound->parent->tp_lock)); + celixThreadMutex_lock(&(bound->mp_lock)); + - pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId); ++ pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(intptr_t)msgTypeId); + + if (msgSer != NULL) { + int major=0, minor=0; + + pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); + strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); + msg_hdr->type = msgTypeId; + + + if (msgSer->msgVersion != NULL){ + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); + msg_hdr->major = major; + msg_hdr->minor = minor; + } + + void* serializedOutput = NULL; + size_t serializedOutputLen = 0; + msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); + + pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t)); + msg->header = msg_hdr; + msg->payload = (char*)serializedOutput; + msg->payloadSize = serializedOutputLen; + + + if(send_pubsub_msg(bound, msg,true, NULL) == false) { + status = -1; + } + free(msg_hdr); + free(msg); + free(serializedOutput); + + + } else { + printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId); + status=-1; + } + + celixThreadMutex_unlock(&(bound->mp_lock)); + celixThreadMutex_unlock(&(bound->parent->tp_lock)); + + return status; +} + +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; +} + + +static unsigned int rand_range(unsigned int min, unsigned int max){ + + double scaled = (double)(((double)random())/((double)RAND_MAX)); + return (max-min+1)*scaled + min; + +} + +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ + + publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); + + if (bound != NULL) { + + bound->parent = tp; + bound->bundle = bundle; + bound->getCount = 1; + celixThreadMutex_create(&bound->mp_lock,NULL); + + if(tp->serializer != NULL){ + tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes); + } + + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); + bound->scope=strdup(pubEP->scope); + bound->topic=strdup(pubEP->topic); + bound->largeUdpHandle = largeUdp_create(1); + + bound->service.handle = bound; + bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->service.send = pubsub_topicPublicationSend; + bound->service.sendMultipart = NULL; //Multipart not supported for UDP + + } + + return bound; +} + +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ + + celixThreadMutex_lock(&boundSvc->mp_lock); + + if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){ + boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes); + } + + if(boundSvc->scope!=NULL){ + free(boundSvc->scope); + } + + if(boundSvc->topic!=NULL){ + free(boundSvc->topic); + } + + largeUdp_destroy(boundSvc->largeUdpHandle); + + celixThreadMutex_unlock(&boundSvc->mp_lock); + celixThreadMutex_destroy(&boundSvc->mp_lock); + + free(boundSvc); + +} + +static void delay_first_send_for_late_joiners(){ + + static bool firstSend = true; + + if(firstSend){ + printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY); + firstSend = false; + } +} http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/shell/CMakeLists.txt ---------------------------------------------------------------------- diff --cc shell/CMakeLists.txt index ae8cf3f,11a16c1..b8aaac3 --- a/shell/CMakeLists.txt +++ b/shell/CMakeLists.txt @@@ -18,35 -18,38 +18,35 @@@ celix_subproject(SHELL "Option to enabl if (SHELL) find_package(CURL REQUIRED) + add_library(shell_api INTERFACE) + target_include_directories(shell_api INTERFACE include) + add_bundle(shell SYMBOLIC_NAME "apache_celix_shell" - VERSION "2.0.0" + VERSION "2.1.0" NAME "Apache Celix Shell" - SOURCES + src/activator + src/shell + src/lb_command + src/start_command + src/stop_command + src/install_command + src/update_command + src/uninstall_command + src/log_command + src/inspect_command + src/help_command + ) + target_include_directories(shell PRIVATE src ${CURL_INCLUDE_DIRS}) + target_link_libraries(shell PRIVATE Celix::shell_api ${CURL_LIBRARIES} Celix::log_service_api Celix::log_helper) - private/src/activator - private/src/shell - private/src/lb_command - private/src/start_command - private/src/stop_command - private/src/install_command - private/src/update_command - private/src/uninstall_command - private/src/log_command - private/src/inspect_command - private/src/help_command - - ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c - - ) - - install_bundle(shell + install_bundle(shell HEADERS - public/include/shell.h public/include/command.h public/include/shell_constants.h - ) + include/shell.h include/command.h include/shell_constants.h + ) - include_directories("public/include") - include_directories("private/include") - include_directories("${PROJECT_SOURCE_DIR}/utils/public/include") - include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include") - include_directories(${CURL_INCLUDE_DIRS}) - target_link_libraries(shell celix_framework ${CURL_LIBRARIES}) + #Setup target aliases to match external usage + add_library(Celix::shell_api ALIAS shell_api) + add_library(Celix::shell ALIAS shell) endif (SHELL) http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/include/properties.h ---------------------------------------------------------------------- diff --cc utils/include/properties.h index cf93ca0,0000000..5c6dc4d mode 100644,000000..100644 --- a/utils/include/properties.h +++ b/utils/include/properties.h @@@ -1,66 -1,0 +1,68 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * properties.h + * + * \date Apr 27, 2010 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PROPERTIES_H_ +#define PROPERTIES_H_ + +#include <stdio.h> + +#include "hash_map.h" +#include "exports.h" +#include "celix_errno.h" +#ifdef __cplusplus +extern "C" { +#endif +typedef hash_map_pt properties_pt; +typedef hash_map_t properties_t; + +UTILS_EXPORT properties_pt properties_create(void); + +UTILS_EXPORT void properties_destroy(properties_pt properties); + +UTILS_EXPORT properties_pt properties_load(const char *filename); + +UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream); + ++UTILS_EXPORT properties_pt properties_loadFromString(const char *input); ++ +UTILS_EXPORT void properties_store(properties_pt properties, const char *file, const char *header); + +UTILS_EXPORT const char *properties_get(properties_pt properties, const char *key); + +UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, const char *key, const char *defaultValue); + +UTILS_EXPORT void properties_set(properties_pt properties, const char *key, const char *value); + +UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, properties_pt *copy); + +#define PROPERTIES_FOR_EACH(props, key) \ + for(hash_map_iterator_t iter = hashMapIterator_construct(props); \ + hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);) +#ifdef __cplusplus +} +#endif + +#endif /* PROPERTIES_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/src/properties.c ---------------------------------------------------------------------- diff --cc utils/src/properties.c index 0bd6dc3,0000000..1e097a0 mode 100644,000000..100644 --- a/utils/src/properties.c +++ b/utils/src/properties.c @@@ -1,302 -1,0 +1,330 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * properties.c + * + * \date Apr 27, 2010 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <ctype.h> +#include "celixbool.h" +#include "properties.h" +#include "utils.h" + +#define MALLOC_BLOCK_SIZE 5 + +static void parseLine(const char* line, properties_pt props); + +properties_pt properties_create(void) { + return hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); +} + +void properties_destroy(properties_pt properties) { + hash_map_iterator_pt iter = hashMapIterator_create(properties); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free(hashMapEntry_getKey(entry)); + free(hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(properties, false, false); +} + +properties_pt properties_load(const char* filename) { + FILE *file = fopen(filename, "r"); + if(file==NULL){ + return NULL; + } + properties_pt props = properties_loadWithStream(file); + fclose(file); + return props; +} + +properties_pt properties_loadWithStream(FILE *file) { + properties_pt props = NULL; + + + if (file != NULL ) { + char *saveptr; + char *filebuffer = NULL; + char *line = NULL; + size_t file_size = 0; + + props = properties_create(); + fseek(file, 0, SEEK_END); + file_size = ftell(file); + fseek(file, 0, SEEK_SET); + + if(file_size > 0){ + filebuffer = calloc(file_size + 1, sizeof(char)); + if(filebuffer) { + size_t rs = fread(filebuffer, sizeof(char), file_size, file); + if(rs != file_size){ + fprintf(stderr,"fread read only %lu bytes out of %lu\n",rs,file_size); + } + filebuffer[file_size]='\0'; + line = strtok_r(filebuffer, "\n", &saveptr); + while ( line != NULL ) { + parseLine(line, props); + line = strtok_r(NULL, "\n", &saveptr); + } + free(filebuffer); + } + } + } + + return props; +} + ++properties_pt properties_loadFromString(const char *input){ ++ properties_pt props = properties_create(); ++ ++ char *in = strdup(input); ++ char *line = NULL; ++ char *saveLinePointer = NULL; ++ ++ bool firstTime = true; ++ do { ++ if (firstTime){ ++ line = strtok_r(in, "\n", &saveLinePointer); ++ firstTime = false; ++ }else { ++ line = strtok_r(NULL, "\n", &saveLinePointer); ++ } ++ ++ if (line == NULL){ ++ break; ++ } ++ ++ parseLine(line, props); ++ } while(line != NULL); ++ ++ free(in); ++ ++ return props; ++} ++ + +/** + * Header is ignored for now, cannot handle comments yet + */ +void properties_store(properties_pt properties, const char* filename, const char* header) { + FILE *file = fopen ( filename, "w+" ); + char *str; + + if (file != NULL) { + if (hashMap_size(properties) > 0) { + hash_map_iterator_pt iterator = hashMapIterator_create(properties); + while (hashMapIterator_hasNext(iterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); + str = hashMapEntry_getKey(entry); + for (int i = 0; i < strlen(str); i += 1) { + if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') { + fputc('\\', file); + } + fputc(str[i], file); + } + + fputc('=', file); + + str = hashMapEntry_getValue(entry); + for (int i = 0; i < strlen(str); i += 1) { + if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') { + fputc('\\', file); + } + fputc(str[i], file); + } + + fputc('\n', file); + + } + hashMapIterator_destroy(iterator); + } + fclose(file); + } else { + perror("File is null"); + } +} + +celix_status_t properties_copy(properties_pt properties, properties_pt *out) { + celix_status_t status = CELIX_SUCCESS; + properties_pt copy = properties_create(); + + if (copy != NULL) { + hash_map_iterator_pt iter = hashMapIterator_create(properties); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + char *key = hashMapEntry_getKey(entry); + char *value = hashMapEntry_getValue(entry); + properties_set(copy, key, value); + } + hashMapIterator_destroy(iter); + } else { + status = CELIX_ENOMEM; + } + + if (status == CELIX_SUCCESS) { + *out = copy; + } + + return status; +} + +const char* properties_get(properties_pt properties, const char* key) { + return hashMap_get(properties, (void*)key); +} + +const char* properties_getWithDefault(properties_pt properties, const char* key, const char* defaultValue) { + const char* value = properties_get(properties, key); + return value == NULL ? defaultValue : value; +} + +void properties_set(properties_pt properties, const char* key, const char* value) { + hash_map_entry_pt entry = hashMap_getEntry(properties, key); + char* oldValue = NULL; + if (entry != NULL) { + char* oldKey = hashMapEntry_getKey(entry); + oldValue = hashMapEntry_getValue(entry); + hashMap_put(properties, oldKey, strndup(value, 1024*10)); + } else { + hashMap_put(properties, strndup(key, 1024*10), strndup(value, 1024*10)); + } + free(oldValue); +} + +static void updateBuffers(char **key, char ** value, char **output, int outputPos, int *key_len, int *value_len) { + if (*output == *key) { + if (outputPos == (*key_len) - 1) { + (*key_len) += MALLOC_BLOCK_SIZE; + *key = realloc(*key, *key_len); + *output = *key; + } + } + else { + if (outputPos == (*value_len) - 1) { + (*value_len) += MALLOC_BLOCK_SIZE; + *value = realloc(*value, *value_len); + *output = *value; + } + } +} + +static void parseLine(const char* line, properties_pt props) { + int linePos = 0; + bool precedingCharIsBackslash = false; + bool isComment = false; + int outputPos = 0; + char *output = NULL; + int key_len = MALLOC_BLOCK_SIZE; + int value_len = MALLOC_BLOCK_SIZE; + linePos = 0; + precedingCharIsBackslash = false; + isComment = false; + output = NULL; + outputPos = 0; + + //Ignore empty lines + if (line[0] == '\n' && line[1] == '\0') { + return; + } + + char *key = calloc(1, key_len); + char *value = calloc(1, value_len); + key[0] = '\0'; + value[0] = '\0'; + + while (line[linePos] != '\0') { + if (line[linePos] == ' ' || line[linePos] == '\t') { + if (output == NULL) { + //ignore + linePos += 1; + continue; + } + } + else { + if (output == NULL) { + output = key; + } + } + if (line[linePos] == '=' || line[linePos] == ':' || line[linePos] == '#' || line[linePos] == '!') { + if (precedingCharIsBackslash) { + //escaped special character + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + precedingCharIsBackslash = false; + } + else { + if (line[linePos] == '#' || line[linePos] == '!') { + if (outputPos == 0) { + isComment = true; + break; + } + else { + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + } + else { // = or : + if (output == value) { //already have a seperator + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + else { + output[outputPos++] = '\0'; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + output = value; + outputPos = 0; + } + } + } + } + else if (line[linePos] == '\\') { + if (precedingCharIsBackslash) { //double backslash -> backslash + output[outputPos++] = '\\'; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + precedingCharIsBackslash = true; + } + else { //normal character + precedingCharIsBackslash = false; + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + linePos += 1; + } + if (output != NULL) { + output[outputPos] = '\0'; + } + + if (!isComment) { + //printf("putting 'key'/'value' '%s'/'%s' in properties\n", utils_stringTrim(key), utils_stringTrim(value)); + properties_set(props, utils_stringTrim(key), utils_stringTrim(value)); + } + if(key) { + free(key); + } + if(value) { + free(value); + } + +}
