This is an automated email from the ASF dual-hosted git repository.
rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new 87c9681 Added option for custom msgId (#38)
87c9681 is described below
commit 87c9681c0dcafbe4b50fea5b1b3f7ef65c06cae6
Author: Roy Lenferink <[email protected]>
AuthorDate: Sun Sep 1 16:05:05 2019 +0200
Added option for custom msgId (#38)
---
.../pubsub/msg_descriptors/msg_poi2.descriptor | 1 +
.../private/include/pubsub_publisher_private.h | 20 ++---
.../publisher/private/src/ps_pub_activator.c | 6 +-
.../publisher/private/src/pubsub_publisher.c | 91 ++++++++++++----------
.../private/include/pubsub_subscriber_private.h | 10 +--
.../subscriber/private/src/ps_sub_activator.c | 12 +--
.../subscriber/private/src/pubsub_subscriber.c | 12 ++-
.../pubsub_admin_tcp/src/pubsub_tcp_common.c | 10 +--
.../pubsub_admin_tcp/src/pubsub_tcp_common.h | 1 -
.../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 26 ++++---
.../pubsub_admin_udp_mc/src/pubsub_udpmc_common.c | 5 --
.../pubsub_admin_udp_mc/src/pubsub_udpmc_common.h | 2 -
.../src/pubsub_udpmc_topic_receiver.c | 4 +-
.../src/pubsub_udpmc_topic_sender.c | 16 ++++
.../src/pubsub_websocket_common.c | 5 --
.../src/pubsub_websocket_common.h | 1 -
.../src/pubsub_websocket_topic_sender.c | 11 ++-
.../pubsub_admin_zmq/src/pubsub_zmq_common.c | 5 --
.../pubsub_admin_zmq/src/pubsub_zmq_common.h | 1 -
.../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 13 +++-
.../pubsub/pubsub_api/include/pubsub/publisher.h | 5 +-
.../pubsub/pubsub_api/include/pubsub/subscriber.h | 1 -
.../src/pubsub_serializer_impl.c | 74 ++++++++++++------
.../src/pubsub_serializer_impl.h | 2 +-
bundles/pubsub/pubsub_spi/CMakeLists.txt | 2 +-
bundles/pubsub/pubsub_spi/include/pubsub_utils.h | 2 +-
bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 20 ++---
27 files changed, 197 insertions(+), 161 deletions(-)
diff --git a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
index 5d9504a..32870e8 100644
--- a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
@@ -4,6 +4,7 @@ name=poi2
version=1.0.0
:annotations
classname=org.example.PointOfInterest
+msgId=5555
:types
location={DD lat lon}
:message
diff --git
a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
index 46d6cb7..113cebb 100644
---
a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
+++
b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
@@ -31,21 +31,21 @@ struct pubsub_sender {
long bundleId;
bool stop;
};
+typedef struct pubsub_sender pubsub_sender_t;
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
- pubsub_publisher_pt service;
- pubsub_sender_pt publisher;
+struct send_thread_struct {
+ pubsub_publisher_t *service;
+ pubsub_sender_t *publisher;
const char *topic;
-} *send_thread_struct_pt;
+};
+typedef struct send_thread_struct send_thread_struct_t;
-pubsub_sender_pt publisher_create(array_list_pt trackers, const char*
ident,long bundleId);
+pubsub_sender_t* publisher_create(array_list_pt trackers, const char*
ident,long bundleId);
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
+void publisher_start(pubsub_sender_t *client);
+void publisher_stop(pubsub_sender_t *client);
-void publisher_destroy(pubsub_sender_pt client);
+void publisher_destroy(pubsub_sender_t *client);
void publisher_publishSvcAdded(void * handle, void *svc, const
celix_properties_t *props);
void publisher_publishSvcRemoved(void * handle, void *svc, const
celix_properties_t *props);
diff --git
a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index 9b24e8d..ef48b9c 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -33,19 +33,19 @@ static const char * PUB_TOPICS[] = {
};
struct publisherActivator {
- pubsub_sender_pt client;
+ pubsub_sender_t *client;
array_list_pt trackerList;//List<service_tracker_pt>
};
static int pub_start(struct publisherActivator *act, celix_bundle_context_t
*ctx) {
const char *fwUUID =
celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
- if (fwUUID==NULL) {
+ if (fwUUID == NULL) {
printf("PUBLISHER: Cannot retrieve fwUUID.\n");
return CELIX_INVALID_BUNDLE_CONTEXT;
}
- bundle_t *bnd = celix_bundleContext_getBundle(ctx);
+ celix_bundle_t *bnd = celix_bundleContext_getBundle(ctx);
long bundleId = celix_bundle_getId(bnd);
act->trackerList = celix_arrayList_create();
diff --git
a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 0a726a2..ec0a241 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -37,94 +37,96 @@
#include "pubsub_publisher_private.h"
static double randCoordinate(double min, double max) {
-
- double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
-
+ double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min)));
return ret;
-
}
static void* send_thread(void* arg) {
+ send_thread_struct_t *st_struct = (send_thread_struct_t *) arg;
- send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
- pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher;
+ pubsub_publisher_t *publish_svc = st_struct->service;
+ pubsub_sender_t *publisher = st_struct->publisher;
char fwUUID[9];
- memset(fwUUID,0,9);
- memcpy(fwUUID,publisher->ident,8);
+ memset(fwUUID, 0, 9);
+ memcpy(fwUUID, publisher->ident, 8);
//poi_t point = calloc(1,sizeof(*point));
- location_t place = calloc(1,sizeof(*place));
+ location_t place = calloc(1, sizeof(*place));
- char* desc = calloc(64,sizeof(char));
- snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+ char *desc = calloc(64, sizeof(char));
+ snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned
long)pthread_self());
- char* name = calloc(64,sizeof(char));
- snprintf(name,64,"Bundle#%ld",publisher->bundleId);
+ char *name = calloc(64, sizeof(char));
+ snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
place->name = name;
place->description = desc;
place->extra = "extra value";
- printf("TOPIC : %s\n",st_struct->topic);
+ printf("TOPIC : %s\n", st_struct->topic);
+
unsigned int msgId = 0;
- if
(publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId)
== 0) {
- while (publisher->stop == false) {
- place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
- place->position.lon = randCoordinate(MIN_LON,MAX_LON);
- int nr_char = (int)randCoordinate(5,100000);
+ while (publisher->stop == false) {
+ if (msgId == 0) {
+ if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,
st_struct->topic, &msgId) != 0) {
+ printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",
MSG_POI_NAME);
+ }
+ }
+
+ if (msgId > 0) {
+ place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
+ place->position.lon = randCoordinate(MIN_LON, MAX_LON);
+ int nr_char = (int) randCoordinate(5, 100000);
place->data = calloc(nr_char, 1);
- for (int i = 0; i < (nr_char-1); i++) {
- place->data[i] = i%10 + '0';
+ for (int i = 0; i < (nr_char - 1); i++) {
+ place->data[i] = i % 10 + '0';
}
- place->data[nr_char-1] = '\0';
+ place->data[nr_char - 1] = '\0';
if (publish_svc->send) {
- if (publish_svc->send(publish_svc->handle, msgId,place) == 0) {
- printf("Sent %s [%f, %f] (%s, %s) data len =
%d\n",st_struct->topic, place->position.lat,
place->position.lon,place->name,place->description, nr_char);
+ if (publish_svc->send(publish_svc->handle, msgId, place) == 0)
{
+ printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",
st_struct->topic,
+ place->position.lat, place->position.lon,
place->name, place->description, nr_char);
}
} else {
printf("No send for %s\n", st_struct->topic);
}
free(place->data);
- sleep(2);
}
- } else {
- printf("PUBLISHER: Cannot retrieve msgId for message
'%s'\n",MSG_POI_NAME);
+ sleep(2);
}
+
free(place->description);
free(place->name);
free(place);
free(st_struct);
-
return NULL;
-
}
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char*
ident,long bundleId) {
- pubsub_sender_pt publisher = malloc(sizeof(*publisher));
+pubsub_sender_t* publisher_create(array_list_pt trackers,const char*
ident,long bundleId) {
+ pubsub_sender_t *publisher = malloc(sizeof(*publisher));
publisher->trackers = trackers;
publisher->ident = ident;
publisher->bundleId = bundleId;
publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
publisher->stop = false;
+
return publisher;
}
-void publisher_start(pubsub_sender_pt client) {
+void publisher_start(pubsub_sender_t *client) {
printf("PUBLISHER: starting up...\n");
}
-void publisher_stop(pubsub_sender_pt client) {
+void publisher_stop(pubsub_sender_t *client) {
printf("PUBLISHER: stopping...\n");
}
-void publisher_destroy(pubsub_sender_pt client) {
+void publisher_destroy(pubsub_sender_t *client) {
hashMap_destroy(client->tid_map, false, false);
client->trackers = NULL;
client->ident = NULL;
@@ -132,29 +134,32 @@ void publisher_destroy(pubsub_sender_pt client) {
}
void publisher_publishSvcAdded(void * handle, void *svc, const
celix_properties_t *props) {
- pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc;
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+ pubsub_publisher_t *publish_svc = (pubsub_publisher_t *) svc;
+ pubsub_sender_t *manager = (pubsub_sender_t *) handle;
manager->stop = false;
- printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
- send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
+
+ printf("PUBLISHER: new publish service exported (%s).\n", manager->ident);
+
+ send_thread_struct_t *data = calloc(1, sizeof(*data));
data->service = publish_svc;
data->publisher = manager;
data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC,
"!ERROR!");
celix_thread_t *tid = malloc(sizeof(*tid));
- celixThread_create(tid,NULL,send_thread,(void*)data);
+ celixThread_create(tid, NULL, send_thread, (void*)data);
hashMap_put(manager->tid_map, publish_svc, tid);
}
void publisher_publishSvcRemoved(void * handle, void *svc, const
celix_properties_t *props) {
- pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+ pubsub_sender_t *manager = (pubsub_sender_t *) handle;
celix_thread_t *tid = hashMap_get(manager->tid_map, svc);
manager->stop = true;
+
#if defined(__APPLE__) && defined(__MACH__)
uint64_t threadid;
pthread_threadid_np(tid->thread, &threadid);
printf("PUBLISHER: publish service unexporting (%s)
%llu!\n",manager->ident, threadid);
#else
- printf("PUBLISHER: publish service unexporting (%s)
%li!\n",manager->ident, tid->thread);
+ printf("PUBLISHER: publish service unexporting (%s) %li!\n",
manager->ident, tid->thread);
#endif
celixThread_join(*tid,NULL);
diff --git
a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 2aa93a6..107325b 100644
---
a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++
b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -38,12 +38,12 @@ struct pubsub_receiver {
char *name;
};
-typedef struct pubsub_receiver* pubsub_receiver_pt;
+typedef struct pubsub_receiver pubsub_receiver_t;
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
+pubsub_receiver_t* subscriber_create(char* topics);
+void subscriber_start(pubsub_receiver_t* client);
+void subscriber_stop(pubsub_receiver_t* client);
+void subscriber_destroy(pubsub_receiver_t* client);
int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int
msgTypeId, void* msg, bool* release);
diff --git
a/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
b/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
index 24ebf36..315ebc6 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
@@ -40,7 +40,7 @@ static const char * SUB_TOPICS[] = {
struct subscriberActivator {
array_list_pt registrationList; //List<service_registration_pt>
- pubsub_subscriber_pt subsvc;
+ pubsub_subscriber_t *subsvc;
};
celix_status_t bundleActivator_create(bundle_context_pt context, void
**userData) {
@@ -53,8 +53,8 @@ celix_status_t bundleActivator_create(bundle_context_pt
context, void **userData
celix_status_t bundleActivator_start(void * userData, bundle_context_pt
context) {
struct subscriberActivator * act = (struct subscriberActivator *) userData;
- pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
- pubsub_receiver_pt sub = subscriber_create(SUB_NAME);
+ pubsub_subscriber_t *subsvc = calloc(1,sizeof(*subsvc));
+ pubsub_receiver_t *sub = subscriber_create(SUB_NAME);
subsvc->handle = sub;
subsvc->receive = pubsub_subscriber_recv;
@@ -76,7 +76,7 @@ celix_status_t bundleActivator_start(void * userData,
bundle_context_pt context)
arrayList_add(act->registrationList,reg);
}
- subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
+ subscriber_start((pubsub_receiver_t *) act->subsvc->handle);
return CELIX_SUCCESS;
}
@@ -91,7 +91,7 @@ celix_status_t bundleActivator_stop(void * userData,
bundle_context_pt context)
}
- subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
+ subscriber_stop((pubsub_receiver_t *) act->subsvc->handle);
return CELIX_SUCCESS;
}
@@ -101,7 +101,7 @@ celix_status_t bundleActivator_destroy(void * userData,
bundle_context_pt contex
struct subscriberActivator * act = (struct subscriberActivator *) userData;
act->subsvc->receive = NULL;
- subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
+ subscriber_destroy((pubsub_receiver_t *) act->subsvc->handle);
act->subsvc->handle = NULL;
free(act->subsvc);
act->subsvc = NULL;
diff --git
a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 2303fea..2474c50 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -30,22 +30,22 @@
#include "poi.h"
#include "pubsub_subscriber_private.h"
-pubsub_receiver_pt subscriber_create(char* topics) {
- pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
+pubsub_receiver_t* subscriber_create(char* topics) {
+ pubsub_receiver_t *sub = calloc(1,sizeof(*sub));
sub->name = strdup(topics);
return sub;
}
-void subscriber_start(pubsub_receiver_pt subscriber) {
+void subscriber_start(pubsub_receiver_t *subscriber) {
printf("Subscriber started...\n");
}
-void subscriber_stop(pubsub_receiver_pt subscriber) {
+void subscriber_stop(pubsub_receiver_t *subscriber) {
printf("Subscriber stopped...\n");
}
-void subscriber_destroy(pubsub_receiver_pt subscriber) {
+void subscriber_destroy(pubsub_receiver_t *subscriber) {
if (subscriber->name != NULL) {
free(subscriber->name);
}
@@ -54,10 +54,8 @@ void subscriber_destroy(pubsub_receiver_pt subscriber) {
}
int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int
msgTypeId, void* msg, bool* release) {
-
location_t place = (location_t)msg;
printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType,
place->position.lat, place->position.lon, place->name, place->description,
place->extra, (long)(strlen(place->data) + 1));
return 0;
-
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
index 28d7f74..fcd45cf 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
@@ -17,18 +17,16 @@
* under the License.
*/
+#include "pubsub_tcp_common.h"
+
#include <memory.h>
#include <assert.h>
#include <string.h>
#include <stdio.h>
+#include <stdint.h>
#include <unistd.h>
-#include "pubsub_psa_tcp_constants.h"
-#include "pubsub_tcp_common.h"
-int psa_tcp_localMsgTypeIdForMsgType(void* handle __attribute__((unused)),
const char* msgType, unsigned int* msgTypeId) {
- *msgTypeId = utils_stringHash(msgType);
- return 0;
-}
+#include "pubsub_psa_tcp_constants.h"
bool psa_tcp_checkVersion(version_pt msgVersion, const pubsub_tcp_msg_header_t
*hdr) {
bool check=false;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index b6f4d5a..feb97ca 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -46,7 +46,6 @@ typedef struct pubsub_tcp_endPointStore{
*/
-int psa_tcp_localMsgTypeIdForMsgType(void* handle, const char* msgType,
unsigned int* msgTypeId);
void psa_tcp_setScopeAndTopicFilter(const char* scope, const char *topic, char
*filter);
bool psa_tcp_checkVersion(version_pt msgVersion, const pubsub_tcp_msg_header_t
*hdr);
void psa_tcp_setupTcpContext(log_helper_t *logHelper, celix_thread_t *thread,
const celix_properties_t *topicProperties);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 79eaca1..83ff981 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -101,22 +101,17 @@ typedef struct psa_tcp_bounded_service_entry {
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+ hash_map_t *msgTypeIds; // key = msg name, value = msg type id
hash_map_t *msgEntries; //key = msg type id, value =
psa_tcp_send_msg_entry_t
int getCount;
} psa_tcp_bounded_service_entry_t;
-static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle,
- const celix_properties_t
*svcProperties);
-
-static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t
*requestingBundle,
- const celix_properties_t
*svcProperties);
-
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType,
unsigned int *msgTypeId);
+static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties);
+static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties);
static unsigned int rand_range(unsigned int min, unsigned int max);
-
static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t
*sender);
-
static void *psa_tcp_sendThread(void *data);
-
static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId,
const void *msg);
pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
@@ -326,6 +321,12 @@ void
pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, con
//TODO
}
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType,
unsigned int *msgTypeId) {
+ psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t
*) handle;
+ *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds,
msgType);
+ return 0;
+}
+
static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle,
const celix_properties_t
*svcProperties __attribute__((unused))) {
pubsub_tcp_topic_sender_t *sender = handle;
@@ -341,9 +342,9 @@ static void *psa_tcp_getPublisherService(void *handle,
const celix_bundle_t *req
entry->parent = sender;
entry->bndId = bndId;
entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+ entry->msgTypeIds = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
- int rc =
sender->serializer->createSerializerMap(sender->serializer->handle,
(celix_bundle_t *) requestingBundle,
- &entry->msgTypes);
+ int rc =
sender->serializer->createSerializerMap(sender->serializer->handle,
(celix_bundle_t *) requestingBundle, &entry->msgTypes);
if (rc == 0) {
hash_map_iterator_t iter =
hashMapIterator_construct(entry->msgTypes);
while (hashMapIterator_hasNext(&iter)) {
@@ -361,6 +362,7 @@ static void *psa_tcp_getPublisherService(void *handle,
const celix_bundle_t *req
uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
hashMap_put(entry->msgEntries, key, sendEntry);
+ hashMap_put(entry->msgTypeIds,
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t)
sendEntry->msgSer->msgId);
}
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType =
psa_tcp_localMsgTypeIdForMsgType;
@@ -400,6 +402,8 @@ static void psa_tcp_ungetPublisherService(void *handle,
const celix_bundle_t *re
free(msgEntry);
}
hashMap_destroy(entry->msgEntries, false, false);
+
+ hashMap_destroy(entry->msgTypeIds, true, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
index e75599d..7073b4f 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
@@ -19,11 +19,6 @@
#include "pubsub_udpmc_common.h"
-int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)),
const char* msgType, unsigned int* msgTypeId) {
- *msgTypeId = utils_stringHash(msgType);
- return 0;
-}
-
bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t
*hdr) {
bool check = false;
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
index 7528f4e..534a2ed 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
@@ -31,8 +31,6 @@ typedef struct pubsub_udp_msg_header {
} pubsub_udp_msg_header_t;
-int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)),
const char* msgType, unsigned int* msgTypeId);
-
bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t
*hdr);
diff --git
a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index fd46062..e26fc31 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -458,8 +458,8 @@ static void
psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
} else {
int major = 0, minor = 0;
- version_getMajor(msgSer->msgVersion,&major);
- version_getMinor(msgSer->msgVersion,&minor);
+ version_getMajor(msgSer->msgVersion, &major);
+ version_getMinor(msgSer->msgVersion, &minor);
printf("[PSA_UDPMC] Version mismatch for primary message '%s'
(have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
}
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 12dfc30..1023005 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -66,6 +66,7 @@ typedef struct psa_udpmc_bounded_service_entry {
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes;
+ hash_map_t *msgTypeIds;
int getCount;
largeUdp_pt largeUdpHandle;
} psa_udpmc_bounded_service_entry_t;
@@ -76,6 +77,7 @@ typedef struct pubsub_msg {
char *payload;
} pubsub_udp_msg_t;
+static int psa_udpmc_localMsgTypeIdForMsgType(void* handle, const char*
msgType, unsigned int* msgTypeId);
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties);
static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties);
static int psa_udpmc_topicPublicationSend(void* handle, unsigned int
msgTypeId, const void *inMsg);
@@ -192,6 +194,12 @@ void
pubsub_udpmcTopicSender_disconnectFrom(pubsub_udpmc_topic_sender_t *sender,
//TODO
}
+static int psa_udpmc_localMsgTypeIdForMsgType(void *handle, const char
*msgType, unsigned int *msgTypeId) {
+ psa_udpmc_bounded_service_entry_t *entry =
(psa_udpmc_bounded_service_entry_t *) handle;
+ *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds,
msgType);
+ return 0;
+}
+
static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties
__attribute__((unused))) {
pubsub_udpmc_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
@@ -208,9 +216,16 @@ static void* psa_udpmc_getPublisherService(void *handle,
const celix_bundle_t *r
entry->parent = sender;
entry->bndId = bndId;
entry->largeUdpHandle = largeUdp_create(1);
+ entry->msgTypeIds = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
int rc =
sender->serializer->createSerializerMap(sender->serializer->handle,
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
if (rc == 0) {
+ hash_map_iterator_t iter =
hashMapIterator_construct(entry->msgTypes);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_msg_serializer_t *msgSer =
hashMapIterator_nextValue(&iter);
+ hashMap_put(entry->msgTypeIds, strndup(msgSer->msgName, 1024),
(void *)(uintptr_t) msgSer->msgId);
+ }
+
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType =
psa_udpmc_localMsgTypeIdForMsgType;
entry->service.send = psa_udpmc_topicPublicationSend;
@@ -244,6 +259,7 @@ static void psa_udpmc_ungetPublisherService(void *handle,
const celix_bundle_t *
fprintf(stderr, "Error destroying publisher service, serializer
not available / cannot get msg serializer map\n");
}
+ hashMap_destroy(entry->msgTypeIds, true, false);
largeUdp_destroy(entry->largeUdpHandle);
free(entry);
}
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
index ce85e40..8273c10 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
@@ -22,11 +22,6 @@
#include <stdio.h>
#include "pubsub_websocket_common.h"
-int psa_websocket_localMsgTypeIdForMsgType(void* handle
__attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
- *msgTypeId = utils_stringHash(msgType);
- return 0;
-}
-
bool psa_websocket_checkVersion(version_pt msgVersion, const
pubsub_websocket_msg_header_t *hdr) {
bool check=false;
int major=0,minor=0;
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
index 89ec65a..6ed6c83 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -46,7 +46,6 @@ struct pubsub_websocket_msg {
typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
-int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType,
unsigned int* msgTypeId);
void psa_websocket_setScopeAndTopicFilter(const char* scope, const char
*topic, char *filter);
char *psa_websocket_createURI(const char *scope, const char *topic);
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
index 42f66c7..4f80829 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -93,11 +93,12 @@ typedef struct psa_websocket_bounded_service_entry {
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+ hash_map_t *msgTypeIds; //key = msg name, value = msg type id
hash_map_t *msgEntries; //key = msg type id, value =
psa_websocket_send_msg_entry_t
int getCount;
} psa_websocket_bounded_service_entry_t;
-
+static int psa_websocket_localMsgTypeIdForMsgType(void* handle
__attribute__((unused)), const char* msgType, unsigned int* msgTypeId);
static void* psa_websocket_getPublisherService(void *handle, const
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void psa_websocket_ungetPublisherService(void *handle, const
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t
*sender);
@@ -229,6 +230,11 @@ const char*
pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
return sender->uri;
}
+static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char*
msgType, unsigned int* msgTypeId) {
+ psa_websocket_bounded_service_entry_t *entry =
(psa_websocket_bounded_service_entry_t *) handle;
+ *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds,
msgType);
+ return 0;
+}
static void* psa_websocket_getPublisherService(void *handle, const
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties
__attribute__((unused))) {
pubsub_websocket_topic_sender_t *sender = handle;
@@ -244,6 +250,7 @@ static void* psa_websocket_getPublisherService(void
*handle, const celix_bundle_
entry->parent = sender;
entry->bndId = bndId;
entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+ entry->msgTypeIds = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
int rc =
sender->serializer->createSerializerMap(sender->serializer->handle,
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
if (rc == 0) {
@@ -263,6 +270,7 @@ static void* psa_websocket_getPublisherService(void
*handle, const celix_bundle_
uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
hashMap_put(entry->msgEntries, key, sendEntry);
+ hashMap_put(entry->msgTypeIds,
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t)
sendEntry->msgSer->msgId);
}
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType =
psa_websocket_localMsgTypeIdForMsgType;
@@ -302,6 +310,7 @@ static void psa_websocket_ungetPublisherService(void
*handle, const celix_bundle
}
hashMap_destroy(entry->msgEntries, false, false);
+ hashMap_destroy(entry->msgTypeIds, true, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index 145798b..4608296 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -21,11 +21,6 @@
#include <assert.h>
#include "pubsub_zmq_common.h"
-int psa_zmq_localMsgTypeIdForMsgType(void* handle __attribute__((unused)),
const char* msgType, unsigned int* msgTypeId) {
- *msgTypeId = utils_stringHash(msgType);
- return 0;
-}
-
bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t
*hdr) {
bool check=false;
int major=0,minor=0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
index 6a72ce9..7918249 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
@@ -50,7 +50,6 @@ struct pubsub_zmq_msg_header {
typedef struct pubsub_zmq_msg_header pubsub_zmq_msg_header_t;
-int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType,
unsigned int* msgTypeId);
void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char
*filter);
bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t
*hdr);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 8166d4c..c7bb7ff 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -98,6 +98,7 @@ typedef struct psa_zmq_bounded_service_entry {
pubsub_publisher_t service;
long bndId;
hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+ hash_map_t *msgTypeIds; //key = msg name, value = msg type id
hash_map_t *msgEntries; //key = msg type id, value =
psa_zmq_send_msg_entry_t
int getCount;
} psa_zmq_bounded_service_entry_t;
@@ -344,6 +345,12 @@ void
pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender __at
/*nop*/
}
+static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType,
unsigned int* msgTypeId) {
+ psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t
*) handle;
+ *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds,
msgType);
+ return 0;
+}
+
static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t
*requestingBundle, const celix_properties_t *svcProperties
__attribute__((unused))) {
pubsub_zmq_topic_sender_t *sender = handle;
long bndId = celix_bundle_getId(requestingBundle);
@@ -358,6 +365,7 @@ static void* psa_zmq_getPublisherService(void *handle,
const celix_bundle_t *req
entry->parent = sender;
entry->bndId = bndId;
entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+ entry->msgTypeIds = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
int rc =
sender->serializer->createSerializerMap(sender->serializer->handle,
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
if (rc == 0) {
@@ -377,6 +385,7 @@ static void* psa_zmq_getPublisherService(void *handle,
const celix_bundle_t *req
uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
hashMap_put(entry->msgEntries, key, sendEntry);
+ hashMap_put(entry->msgTypeIds,
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t)
sendEntry->msgSer->msgId);
}
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType =
psa_zmq_localMsgTypeIdForMsgType;
@@ -385,9 +394,6 @@ static void* psa_zmq_getPublisherService(void *handle,
const celix_bundle_t *req
} else {
L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s",
sender->scope, sender->topic);
}
-
-
-
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -419,6 +425,7 @@ static void psa_zmq_ungetPublisherService(void *handle,
const celix_bundle_t *re
}
hashMap_destroy(entry->msgEntries, false, false);
+ hashMap_destroy(entry->msgTypeIds, true, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 96483d8..18eaf56 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -43,7 +43,7 @@
struct pubsub_publisher {
void *handle;
-
+
/**
* Every msg is identifiable by msg type string. Because msg type string
are performance wise not preferable (string compares),
* a "local" (int / platform dependent) unique id will be generated runtime
@@ -54,7 +54,7 @@ struct pubsub_publisher {
* Returns 0 on success.
*/
int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType,
unsigned int *msgTypeId);
-
+
/**
* send is a async function, but the msg can be safely deleted after send
returns.
* Returns 0 on success.
@@ -63,6 +63,5 @@ struct pubsub_publisher {
};
typedef struct pubsub_publisher pubsub_publisher_t;
-typedef struct pubsub_publisher* pubsub_publisher_pt;
#endif // __PUBSUB_PUBLISHER_H_
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 3debad9..7daf5d2 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -66,7 +66,6 @@ struct pubsub_subscriber_struct {
};
typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
-typedef struct pubsub_subscriber_struct* pubsub_subscriber_pt;
#endif // __PUBSUB_SUBSCRIBER_H_
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 5637937..618f9a4 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -67,12 +67,12 @@ typedef struct pubsub_json_msg_serializer_impl {
version_pt msgVersion;
} pubsub_json_msg_serializer_impl_t;
-static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
-static void pubsubSerializer_addMsgSerializerFromBundle(const char *root,
bundle_pt bundle, hash_map_pt msgTypesMap);
-static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt
msgTypesMap,bundle_pt bundle);
+static char* pubsubSerializer_getMsgDescriptionDir(celix_bundle_t *bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root,
celix_bundle_t *bundle, hash_map_pt msgSerializers);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt
msgSerializers,celix_bundle_t *bundle);
-static int pubsubMsgSerializer_convert_descriptor(FILE* file_ptr,
pubsub_msg_serializer_t* serializer);
-static int pubsubMsgSerializer_convert_avpr(FILE* file_ptr,
pubsub_msg_serializer_t* serializer, const char* fqn);
+static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr,
pubsub_msg_serializer_t* serializer);
+static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr,
pubsub_msg_serializer_t* serializer, const char* fqn);
static void dfi_log(void *handle, int level, const char *file, int line, const
char *msg, ...) {
va_list ap;
@@ -121,7 +121,7 @@ celix_status_t
pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
return status;
}
-celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt
bundle, hash_map_pt* serializerMap) {
+celix_status_t pubsubSerializer_createSerializerMap(void *handle,
celix_bundle_t *bundle, hash_map_pt* serializerMap) {
celix_status_t status = CELIX_SUCCESS;
pubsub_json_serializer_t *serializer = handle;
@@ -207,7 +207,7 @@ void pubsubMsgSerializer_freeMsg(void* handle, void *msg) {
}
-static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers,
bundle_pt bundle) {
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers,
celix_bundle_t *bundle) {
char* root = NULL;
char* metaInfPath = NULL;
@@ -224,8 +224,7 @@ static void
pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bu
}
}
-static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
-{
+static char* pubsubSerializer_getMsgDescriptionDir(celix_bundle_t *bundle) {
char *root = NULL;
bool isSystemBundle = false;
@@ -251,9 +250,7 @@ static char*
pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
return root;
}
-
-static void pubsubSerializer_addMsgSerializerFromBundle(const char *root,
bundle_pt bundle, hash_map_pt msgSerializers)
-{
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root,
celix_bundle_t *bundle, hash_map_pt msgSerializers) {
char fqn[MAX_PATH_LEN];
char path[MAX_PATH_LEN];
const char* entry_name = NULL;
@@ -282,10 +279,10 @@ static void
pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
int translation_result = -1;
if (fileInputType == FIT_DESCRIPTOR) {
- translation_result =
pubsubMsgSerializer_convert_descriptor(stream, msgSerializer);
+ translation_result = pubsubMsgSerializer_convertDescriptor(stream,
msgSerializer);
}
else if (fileInputType == FIT_AVPR) {
- translation_result = pubsubMsgSerializer_convert_avpr(stream,
msgSerializer, fqn);
+ translation_result = pubsubMsgSerializer_convertAvpr(stream,
msgSerializer, fqn);
}
fclose(stream);
@@ -317,6 +314,7 @@ static void
pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
closedir(dir);
}
}
+
static FILE* openFileStream(FILE_INPUT_TYPE file_input_type, const char*
filename, const char* root, char* avpr_fqn, char* path) {
FILE* result = NULL;
memset(path, 0, MAX_PATH_LEN);
@@ -394,15 +392,15 @@ static bool readPropertiesFile(const char*
properties_file_name, const char* roo
return true;
}
-static int pubsubMsgSerializer_convert_descriptor(FILE* file_ptr,
pubsub_msg_serializer_t* serializer) {
- dyn_message_type* msgType = NULL;
+static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr,
pubsub_msg_serializer_t* serializer) {
+ dyn_message_type *msgType = NULL;
int rc = dynMessage_parse(file_ptr, &msgType);
if (rc != 0 || msgType == NULL) {
printf("DMU: cannot parse message from descriptor.\n");
return -1;
}
- char* msgName = NULL;
+ char *msgName = NULL;
rc += dynMessage_getName(msgType, &msgName);
version_pt msgVersion = NULL;
@@ -413,11 +411,25 @@ static int pubsubMsgSerializer_convert_descriptor(FILE*
file_ptr, pubsub_msg_ser
return -1;
}
- dyn_type * type = NULL;
+ dyn_type *type = NULL;
dynMessage_getMessageType(msgType, &type);
- unsigned int msgId = utils_stringHash(msgName);
- pubsub_json_msg_serializer_impl_t * handle =
(pubsub_json_msg_serializer_impl_t*)serializer->handle;
+ unsigned int msgId = 0;
+
+ char *msgIdStr = NULL;
+ int rv = dynMessage_getAnnotationEntry(msgType, "msgId", &msgIdStr);
+ if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+ // custom msg id passed, use it
+ long customMsgId = strtol(msgIdStr, NULL, 10);
+ if (customMsgId > 0)
+ msgId = (unsigned int) customMsgId;
+ }
+
+ if (msgId == 0) {
+ msgId = utils_stringHash(msgName);
+ }
+
+ pubsub_json_msg_serializer_impl_t *handle =
(pubsub_json_msg_serializer_impl_t*)serializer->handle;
handle->type = type;
handle->msgId = msgId;
handle->msgName = msgName;
@@ -434,16 +446,16 @@ static int pubsubMsgSerializer_convert_descriptor(FILE*
file_ptr, pubsub_msg_ser
return 0;
}
-static int pubsubMsgSerializer_convert_avpr(FILE* file_ptr,
pubsub_msg_serializer_t* serializer, const char* fqn) {
+static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr,
pubsub_msg_serializer_t* serializer, const char* fqn) {
if (!file_ptr || !fqn || !serializer) return -2;
- dyn_type* type = dynType_parseAvpr(file_ptr, fqn);
+ dyn_type *type = dynType_parseAvpr(file_ptr, fqn);
if (!type) {
printf("DMU: cannot parse avpr file for '%s'\n", fqn);
return -1;
}
- const char* msgName = dynType_getName(type);
+ const char *msgName = dynType_getName(type);
version_pt msgVersion = NULL;
celix_status_t s =
version_createVersionFromString(dynType_getMetaInfo(type, "version"),
&msgVersion);
@@ -456,8 +468,20 @@ static int pubsubMsgSerializer_convert_avpr(FILE*
file_ptr, pubsub_msg_serialize
return -1;
}
- unsigned int msgId = utils_stringHash(msgName);
- pubsub_json_msg_serializer_impl_t * handle =
(pubsub_json_msg_serializer_impl_t*) serializer->handle;
+ unsigned int msgId = 0;
+ const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+ if (msgIdStr != NULL) {
+ // custom msg id passed, use it
+ long customMsgId = strtol(msgIdStr, NULL, 10);
+ if (customMsgId > 0)
+ msgId = (unsigned int) customMsgId;
+ }
+
+ if (msgId == 0) {
+ msgId = utils_stringHash(msgName);
+ }
+
+ pubsub_json_msg_serializer_impl_t *handle =
(pubsub_json_msg_serializer_impl_t*) serializer->handle;
handle->type = type;
handle->msgId = msgId;
handle->msgName = msgName;
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
index f44f6b2..9a850f2 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -34,7 +34,7 @@ typedef struct pubsub_json_serializer
pubsub_json_serializer_t;
celix_status_t pubsubSerializer_create(bundle_context_pt context,
pubsub_json_serializer_t **serializer);
celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer);
-celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt
bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_createSerializerMap(void *handle,
celix_bundle_t *bundle, hash_map_pt* serializerMap);
celix_status_t pubsubSerializer_destroySerializerMap(void *handle, hash_map_pt
serializerMap);
#endif /* PUBSUB_SERIALIZER_JSON_H_ */
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt
b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index 509c760..ffb9625 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -27,7 +27,7 @@ add_library(pubsub_spi STATIC
src/pubsub_utils_match.c
src/pubsub_endpoint.c
src/pubsub_utils.c
- src/pubsub_admin_metrics.c
+ src/pubsub_admin_metrics.c
)
target_include_directories(pubsub_spi SYSTEM PRIVATE ${UUID_INCLUDE_DIRS})
set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 7c80745..39c1841 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -40,7 +40,7 @@ extern "C" {
*/
celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char
**topic, char **scope);
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx);
double pubsub_utils_matchPublisher(
celix_bundle_context_t *ctx,
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
index b6ed882..4689076 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -89,8 +89,7 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char*
filterstr, char **topi
*
* Caller is responsible for freeing the object
*/
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
-{
+char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx) {
array_list_pt bundles = NULL;
bundleContext_getBundles(ctx, &bundles);
int nrOfBundles = arrayList_size(bundles);
@@ -98,7 +97,7 @@ char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
char* result = NULL;
for (int i = 0; i < nrOfBundles; i++) {
- bundle_pt b = arrayList_get(bundles, i);
+ celix_bundle_t *b = arrayList_get(bundles, i);
/* Skip bundle 0 (framework bundle) since it has no path nor revisions
*/
bundle_getBundleId(b, &bundle_id);
@@ -130,23 +129,20 @@ char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
}
celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t
*bundle, const char *topic, bool isPublisher) {
-
celix_properties_t *topic_props = NULL;
bool isSystemBundle = false;
- bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
+ bundle_isSystemBundle((celix_bundle_t *)bundle, &isSystemBundle);
long bundleId = -1;
- bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
- bundle_getBundleId((bundle_pt)bundle,&bundleId);
+ bundle_isSystemBundle((celix_bundle_t *)bundle, &isSystemBundle);
+ bundle_getBundleId((celix_bundle_t *)bundle,&bundleId);
if (isSystemBundle == false) {
-
char *bundleRoot = NULL;
- char* topicPropertiesPath = NULL;
- bundle_getEntry((bundle_pt)bundle, ".", &bundleRoot);
+ char *topicPropertiesPath = NULL;
+ bundle_getEntry((celix_bundle_t *)bundle, ".", &bundleRoot);
if (bundleRoot != NULL) {
-
asprintf(&topicPropertiesPath,
"%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher? "pub":"sub",
topic);
topic_props = celix_properties_load(topicPropertiesPath);
if (topic_props == NULL) {
@@ -159,4 +155,4 @@ celix_properties_t *pubsub_utils_getTopicProperties(const
celix_bundle_t *bundle
}
return topic_props;
-}
\ No newline at end of file
+}