http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/CMakeLists.txt 
b/pubsub/pubsub_discovery/CMakeLists.txt
index 0e7d6c5..f92f81c 100644
--- a/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/pubsub/pubsub_discovery/CMakeLists.txt
@@ -18,7 +18,7 @@
 find_package(CURL REQUIRED)
 find_package(Jansson REQUIRED)
 
-add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+add_bundle(celix_pubsub_discovery_etcd
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
     VERSION "1.0.0"
     SOURCES
@@ -27,16 +27,15 @@ 
add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
         src/etcd_common.c
         src/etcd_watcher.c
         src/etcd_writer.c
-               
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-               
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )
 
-target_include_directories(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 PRIVATE
+target_include_directories(celix_pubsub_discovery_etcd PRIVATE
     src
-    include
        ${CURL_INCLUDE_DIR}
        ${JANSSON_INCLUDE_DIR}
-        )
+)
+
+target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi 
Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+install_bundle(celix_pubsub_discovery_etcd)
 
-target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery 
PRIVATE Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} 
${JANSSON_LIBRARIES})
-install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)
+add_library(Celix::pubsub_discovery_etcd ALIAS celix_pubsub_discovery_etcd)

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt 
b/pubsub/pubsub_serializer_json/CMakeLists.txt
index b86f30e..1269cad 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -18,22 +18,23 @@
 find_package(Jansson REQUIRED)
 
 
-add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
+add_bundle(celix_pubsub_serializer_json
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_serializer_json"
     VERSION "1.0.0"
     SOURCES
                src/ps_activator.c
                src/pubsub_serializer_impl.c
-       ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )
 
-target_include_directories(org.apache.celix.pubsub_serializer.PubSubSerializerJson
 PRIVATE
+target_include_directories(celix_pubsub_serializer_json PRIVATE
        src
        ${JANSSON_INCLUDE_DIR}
 )
 
-set_target_properties(org.apache.celix.pubsub_serializer.PubSubSerializerJson 
PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(org.apache.celix.pubsub_serializer.PubSubSerializerJson 
PRIVATE Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper)
+set_target_properties(celix_pubsub_serializer_json PROPERTIES INSTALL_RPATH 
"$ORIGIN")
+target_link_libraries(celix_pubsub_serializer_json PRIVATE Celix::pubsub_spi 
Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper)
 
-install_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson)
+install_bundle(celix_pubsub_serializer_json)
+
+add_library(Celix::pubsub_serializer_json ALIAS celix_pubsub_serializer_json)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/CMakeLists.txt b/pubsub/pubsub_spi/CMakeLists.txt
new file mode 100644
index 0000000..118dd3a
--- /dev/null
+++ b/pubsub/pubsub_spi/CMakeLists.txt
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(celix_pubsub_spi STATIC
+               src/pubsub_admin_match.c
+        src/pubsub_endpoint.c
+        src/pubsub_utils.c
+)
+target_include_directories(celix_pubsub_spi PUBLIC
+               $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
+               $<INSTALL_INTERFACE:include/celix/pubsub_spi>
+)
+target_link_libraries(celix_pubsub_spi PUBLIC Celix::framework 
Celix::pubsub_api)
+
+set_target_properties(celix_pubsub_spi PROPERTIES TOPIC_INFO_DESCRIPTOR 
${CMAKE_CURRENT_LIST_DIR}/include/pubsub_topic_info.descriptor)
+#TODO how to make this descriptor available for imported targets? 
$<INSTALL_INTERFACE:include/celix/pubsub_spi/pubsub_topic_info.descriptor>
+
+add_library(Celix::pubsub_spi ALIAS celix_pubsub_spi)
+
+install(TARGETS celix_pubsub_spi EXPORT celix DESTINATION 
${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub)
+install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT 
pubsub)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h 
b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
new file mode 100644
index 0000000..bd39fc0
--- /dev/null
+++ b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
@@ -0,0 +1,36 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_
+#define PUBLISHER_ENDPOINT_ANNOUNCE_H_
+
+#include "pubsub_endpoint.h"
+
+struct publisher_endpoint_announce {
+       void *handle;
+       celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt 
pubEP);
+       celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt 
pubEP);
+       celix_status_t (*interestedInTopic)(void* handle, const char *scope, 
const char *topic);
+       celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, 
const char *topic);
+};
+
+typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt;
+
+
+#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin.h 
b/pubsub/pubsub_spi/include/pubsub_admin.h
new file mode 100644
index 0000000..f24d825
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -0,0 +1,72 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin.h
+ *
+ *  \date       Sep 30, 2011
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_H_
+#define PUBSUB_ADMIN_H_
+
+#include "service_reference.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+
+#define PSA_IP         "PSA_IP"
+#define PSA_ITF        "PSA_INTERFACE"
+#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+
+#define PUBSUB_ADMIN_TYPE_KEY  "pubsub_admin.type"
+
+typedef struct pubsub_admin *pubsub_admin_pt;
+
+struct pubsub_admin_service {
+       pubsub_admin_pt admin;
+
+       celix_status_t (*addSubscription)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+       celix_status_t (*removeSubscription)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+
+       celix_status_t (*addPublication)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+       celix_status_t (*removePublication)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+
+       celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* 
scope, char* topic);
+       celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* 
scope, char* topic);
+
+       /* Match principle:
+        * - A full matching pubsub_admin gives 200 points
+        * - A full matching serializer gives 100 points
+        * - If QoS = sample
+        *              - fallback pubsub_admin order of selection is: udp_mc, 
zmq. Points allocation is 100,75.
+        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
+        * - If QoS = control
+        *              - fallback pubsub_admin order of selection is: 
zmq,udp_mc. Points allocation is 100,75.
+        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
+        * - If nothing is specified, QoS = sample is assumed, so the same 
score applies, just divided by two.
+        *
+        */
+       celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
+};
+
+typedef struct pubsub_admin_service *pubsub_admin_service_pt;
+
+#endif /* PUBSUB_ADMIN_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin_match.h 
b/pubsub/pubsub_spi/include/pubsub_admin_match.h
new file mode 100644
index 0000000..e95ca7d
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_admin_match.h
@@ -0,0 +1,40 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#ifndef PUBSUB_ADMIN_MATCH_H_
+#define PUBSUB_ADMIN_MATCH_H_
+
+#include "celix_errno.h"
+#include "properties.h"
+#include "array_list.h"
+
+#include "pubsub_serializer.h"
+
+#define QOS_ATTRIBUTE_KEY      "attribute.qos"
+#define QOS_TYPE_SAMPLE                "sample"        /* A.k.a. unreliable 
connection */
+#define QOS_TYPE_CONTROL       "control"       /* A.k.a. reliable connection */
+
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE  200.0F
+#define SERIALIZER_FULL_MATCH_SCORE            100.0F
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
+
+#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_common.h 
b/pubsub/pubsub_spi/include/pubsub_common.h
new file mode 100644
index 0000000..5dfd8fd
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_common.h
@@ -0,0 +1,52 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_common.h
+ *
+ *  \date       Sep 17, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_COMMON_H_
+#define PUBSUB_COMMON_H_
+
+#define PUBSUB_SERIALIZER_SERVICE              "pubsub_serializer"
+#define PUBSUB_ADMIN_SERVICE                   "pubsub_admin"
+#define PUBSUB_DISCOVERY_SERVICE               "pubsub_discovery"
+#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
+
+#define PUBSUB_ANY_SUB_TOPIC                   "any"
+
+#define        PUBSUB_BUNDLE_ID                        "bundle.id"
+
+#define MAX_SCOPE_LEN                           1024
+#define MAX_TOPIC_LEN                          1024
+
+struct pubsub_msg_header{
+       char topic[MAX_TOPIC_LEN];
+       unsigned int type;
+       unsigned char major;
+       unsigned char minor;
+};
+
+typedef struct pubsub_msg_header* pubsub_msg_header_pt;
+
+
+#endif /* PUBSUB_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_endpoint.h 
b/pubsub/pubsub_spi/include/pubsub_endpoint.h
new file mode 100644
index 0000000..4c39d2f
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -0,0 +1,58 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_endpoint.h
+ *
+ *  \date       Sep 21, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ENDPOINT_H_
+#define PUBSUB_ENDPOINT_H_
+
+#include "service_reference.h"
+#include "listener_hook_service.h"
+#include "properties.h"
+
+#include "pubsub/publisher.h"
+#include "pubsub/subscriber.h"
+
+struct pubsub_endpoint {
+    char *frameworkUUID;
+    char *scope;
+    char *topic;
+    long serviceID;
+    char* endpoint;
+    bool is_secure;
+    properties_pt topic_props;
+};
+
+typedef struct pubsub_endpoint *pubsub_endpoint_pt;
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
+
+char *createScopeTopicKey(const char* scope, const char* topic);
+
+#endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_serializer.h 
b/pubsub/pubsub_spi/include/pubsub_serializer.h
new file mode 100644
index 0000000..4489fa4
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -0,0 +1,66 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer.h
+ *
+ *  \date       Mar 24, 2017
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
+
+#include "service_reference.h"
+#include "hash_map.h"
+
+#include "pubsub_common.h"
+
+#define PUBSUB_SERIALIZER_TYPE_KEY     "pubsub_serializer.type"
+
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+
+typedef struct pubsub_msg_serializer {
+       void* handle;
+       unsigned int msgId;
+       const char* msgName;
+       version_pt msgVersion;
+
+       celix_status_t (*serialize)(void* handle, const void* input, void** 
out, size_t* outLen);
+       celix_status_t (*deserialize)(void* handle, const void* input, size_t 
inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+       void (*freeMsg)(void* handle, void* msg);
+
+} pubsub_msg_serializer_t;
+
+typedef struct pubsub_serializer_service {
+       void* handle;
+
+       celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, 
hash_map_pt* serializerMap);
+       celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt 
serializerMap);
+
+} pubsub_serializer_service_t;
+
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor 
b/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
new file mode 100644
index 0000000..c01a2fd
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
@@ -0,0 +1,10 @@
+:header
+type=interface
+name=pubsub_topic_info
+version=1.0.0
+:annotations
+:types
+:methods
+getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N
+getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N
+getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_utils.h 
b/pubsub/pubsub_spi/include/pubsub_utils.h
new file mode 100644
index 0000000..aff5c72
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_UTILS_H_
+#define PUBSUB_UTILS_H_
+
+#include "bundle_context.h"
+#include "array_list.h"
+
+char* pubsub_getScopeFromFilter(char* bundle_filter);
+char* pubsub_getTopicFromFilter(char* bundle_filter);
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+array_list_pt pubsub_getTopicsFromString(char* string);
+
+
+#endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_admin_match.c 
b/pubsub/pubsub_spi/src/pubsub_admin_match.c
new file mode 100644
index 0000000..2a695c1
--- /dev/null
+++ b/pubsub/pubsub_spi/src/pubsub_admin_match.c
@@ -0,0 +1,320 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <string.h>
+#include "service_reference.h"
+
+#include "pubsub_admin.h"
+
+#include "pubsub_admin_match.h"
+
+#define KNOWN_PUBSUB_ADMIN_NUM 2
+#define KNOWN_SERIALIZER_NUM   2
+
+static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"udp_mc","zmq"};
+static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
+
+static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"zmq","udp_mc"};
+static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
+
+static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
+static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
+
+static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType);
+static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService);
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score){
+
+       celix_status_t status = CELIX_SUCCESS;
+       double final_score = 0;
+       int i = 0, j = 0;
+
+       const char *requested_admin_type                = NULL;
+       const char *requested_serializer_type   = NULL;
+       const char *requested_qos_type                  = NULL;
+
+       if(endpoint_props!=NULL){
+               requested_admin_type            = 
properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
+               requested_serializer_type       = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+               requested_qos_type                      = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+       }
+
+       /* Analyze the pubsub_admin */
+       if(requested_admin_type != NULL){ /* We got precise specification on 
the pubsub_admin we want */
+               
if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){
 //Full match
+                       final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
+               }
+       }
+       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected PSA */
+               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+                               
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+                                       final_score += 
qos_pubsub_admin_score[i];
+                                       break;
+                               }
+                       }
+               }
+               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+                               
if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+                                       final_score += 
qos_pubsub_admin_score[i];
+                                       break;
+                               }
+                       }
+               }
+               else{
+                       printf("Unknown QoS type '%s'\n",requested_qos_type);
+                       status = CELIX_ILLEGAL_ARGUMENT;
+               }
+       }
+       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
+               for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+                       
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+                               final_score += (qos_pubsub_admin_score[i]/2);
+                               break;
+                       }
+               }
+       }
+
+       char *serializer_type = NULL;
+       /* Analyze the serializers */
+       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
+               for(i=0;i<arrayList_size(serializerList);i++){
+                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
+                       get_serializer_type(svcRef, &serializer_type);
+                       if(serializer_type != NULL){
+                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+                                       final_score += 
SERIALIZER_FULL_MATCH_SCORE;
+                                       break;
+                               }
+                       }
+               }
+       }
+       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
+               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       ser_found = true;
+                                               }
+                                       }
+                               }
+                               if(ser_found){
+                                       final_score += qos_serializer_score[i];
+                               }
+                       }
+               }
+               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       ser_found = true;
+                                               }
+                                       }
+                               }
+                               if(ser_found){
+                                       final_score += qos_serializer_score[i];
+                               }
+                       }
+               }
+               else{
+                       printf("Unknown QoS type '%s'\n",requested_qos_type);
+                       status = CELIX_ILLEGAL_ARGUMENT;
+               }
+       }
+       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
+               bool ser_found = false;
+               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                               service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                               get_serializer_type(svcRef, &serializer_type);
+                               if(serializer_type != NULL){
+                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                               ser_found = true;
+                                       }
+                               }
+                       }
+                       if(ser_found){
+                               final_score += (qos_serializer_score[i]/2);
+                       }
+               }
+       }
+
+       *score = final_score;
+
+       printf("Score for pair <%s,%s> = 
%f\n",pubsub_admin_type,serializer_type,final_score);
+
+       return status;
+}
+
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
+       celix_status_t status = CELIX_SUCCESS;
+
+       int i = 0, j = 0;
+
+       const char *requested_serializer_type = NULL;
+       const char *requested_qos_type = NULL;
+
+       if (endpoint_props != NULL){
+               requested_serializer_type = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+               requested_qos_type = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+       }
+
+       service_reference_pt svcRef = NULL;
+       void *svc = NULL;
+
+       /* Analyze the serializers */
+       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
+               for(i=0;i<arrayList_size(serializerList);i++){
+                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
+                       char *serializer_type = NULL;
+                       get_serializer_type(svcRef, &serializer_type);
+                       if(serializer_type != NULL){
+                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+                                       manage_service_from_reference(svcRef, 
&svc,true);
+                                       if(svc==NULL){
+                                               printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                               status = 
CELIX_SERVICE_EXCEPTION;
+                                       }
+                                       *serSvc = svc;
+                                       break;
+                               }
+                       }
+               }
+       }
+       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
+               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       char *serializer_type = NULL;
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       
manage_service_from_reference(svcRef, &svc,true);
+                                                       if(svc==NULL){
+                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                                               status = 
CELIX_SERVICE_EXCEPTION;
+                                                       }
+                                                       else{
+                                                               *serSvc = svc;
+                                                               ser_found = 
true;
+                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+                       bool ser_found = false;
+                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                                       char *serializer_type = NULL;
+                                       get_serializer_type(svcRef, 
&serializer_type);
+                                       if(serializer_type != NULL){
+                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                                       
manage_service_from_reference(svcRef, &svc,true);
+                                                       if(svc==NULL){
+                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                                               status = 
CELIX_SERVICE_EXCEPTION;
+                                                       }
+                                                       else{
+                                                               *serSvc = svc;
+                                                               ser_found = 
true;
+                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+               else{
+                       printf("Unknown QoS type '%s'\n",requested_qos_type);
+                       status = CELIX_ILLEGAL_ARGUMENT;
+               }
+       }
+       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
+               bool ser_found = false;
+               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
+                               svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
+                               char *serializer_type = NULL;
+                               get_serializer_type(svcRef, &serializer_type);
+                               if(serializer_type != NULL){
+                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+                                               
manage_service_from_reference(svcRef, &svc,true);
+                                               if(svc==NULL){
+                                                       printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
+                                                       status = 
CELIX_SERVICE_EXCEPTION;
+                                               }
+                                               else{
+                                                       *serSvc = svc;
+                                                       ser_found = true;
+                                                       printf("Selected %s 
serializer as best without any 
specification\n",qos_sample_serializer_prio_list[i]);
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       if(svc!=NULL && svcRef!=NULL){
+               manage_service_from_reference(svcRef, svc, false);
+       }
+
+       return status;
+}
+
+static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType){
+
+       const char *serType = NULL;
+       serviceReference_getProperty(svcRef, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType != NULL){
+               *serializerType = (char*)serType;
+       }
+       else{
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",svcRef);
+               *serializerType = NULL;
+       }
+}
+
+static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService){
+       bundle_context_pt context = NULL;
+       bundle_pt bundle = NULL;
+       serviceReference_getBundle(svcRef, &bundle);
+       bundle_getContext(bundle, &context);
+       if(getService){
+               bundleContext_getService(context, svcRef, svc);
+       }
+       else{
+               bundleContext_ungetService(context, svcRef, NULL);
+       }
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c 
b/pubsub/pubsub_spi/src/pubsub_endpoint.c
new file mode 100644
index 0000000..c3fd293
--- /dev/null
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -0,0 +1,254 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description.c
+ *
+ *  \date       25 Jul 2014
+ *  \author     <a href="mailto:[email protected]";>Apache Celix Project 
Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "celix_errno.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "constants.h"
+
+#include "pubsub_utils.h"
+
+
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps);
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher);
+
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps){
+
+       if (fwUUID != NULL) {
+               psEp->frameworkUUID = strdup(fwUUID);
+       }
+
+       if (scope != NULL) {
+               psEp->scope = strdup(scope);
+       }
+
+       if (topic != NULL) {
+               psEp->topic = strdup(topic);
+       }
+
+       psEp->serviceID = serviceId;
+
+       if(endpoint != NULL) {
+               psEp->endpoint = strdup(endpoint);
+       }
+
+       if(topic_props != NULL){
+               if(cloneProps){
+                       properties_copy(topic_props, &(psEp->topic_props));
+               }
+               else{
+                       psEp->topic_props = topic_props;
+               }
+       }
+}
+
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher){
+
+       properties_pt topic_props = NULL;
+
+       bool isSystemBundle = false;
+       bundle_isSystemBundle(bundle, &isSystemBundle);
+       long bundleId = -1;
+       bundle_isSystemBundle(bundle, &isSystemBundle);
+       bundle_getBundleId(bundle,&bundleId);
+
+       if(isSystemBundle == false) {
+
+               char *bundleRoot = NULL;
+               char* topicPropertiesPath = NULL;
+               bundle_getEntry(bundle, ".", &bundleRoot);
+
+               if(bundleRoot != NULL){
+
+                       asprintf(&topicPropertiesPath, 
"%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", 
topic);
+                       topic_props = properties_load(topicPropertiesPath);
+                       if(topic_props==NULL){
+                               printf("PSEP: Could not load properties for %s 
on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", 
topic,bundleId);
+                       }
+
+                       free(topicPropertiesPath);
+                       free(bundleRoot);
+               }
+       }
+
+       return topic_props;
+}
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp){
+       celix_status_t status = CELIX_SUCCESS;
+
+       *psEp = calloc(1, sizeof(**psEp));
+
+       pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, 
endpoint, topic_props, true);
+
+       return status;
+
+}
+
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out){
+       celix_status_t status = CELIX_SUCCESS;
+
+       *out = calloc(1,sizeof(**out));
+
+       pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, 
in->serviceID, in->endpoint, in->topic_props, true);
+
+       return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference, pubsub_endpoint_pt* psEp, bool isPublisher){
+       celix_status_t status = CELIX_SUCCESS;
+
+       pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
+
+       bundle_pt bundle = NULL;
+       bundle_context_pt ctxt = NULL;
+       const char* fwUUID = NULL;
+       serviceReference_getBundle(reference,&bundle);
+       bundle_getContext(bundle,&ctxt);
+       bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+       const char* scope = NULL;
+       serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
+
+       const char* topic = NULL;
+       serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
+
+       const char* serviceId = NULL;
+       
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
+
+       /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
+       properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
+
+       pubsubEndpoint_setFields(ep, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, 
strtol(serviceId,NULL,10), NULL, topic_props, false);
+
+       if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) {
+               fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
+               status = CELIX_BUNDLE_EXCEPTION;
+               pubsubEndpoint_destroy(ep);
+               *psEp = NULL;
+       }
+       else{
+               *psEp = ep;
+       }
+
+       return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher){
+       celix_status_t status = CELIX_SUCCESS;
+
+       const char* fwUUID=NULL;
+       
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+       if(fwUUID==NULL){
+               return CELIX_BUNDLE_EXCEPTION;
+       }
+
+       char* topic = pubsub_getTopicFromFilter(info->filter);
+       if(topic==NULL){
+               return CELIX_BUNDLE_EXCEPTION;
+       }
+
+       *psEp = calloc(1, sizeof(**psEp));
+
+       char* scope = pubsub_getScopeFromFilter(info->filter);
+       if(scope == NULL) {
+               scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+       }
+
+       bundle_pt bundle = NULL;
+       long bundleId = -1;
+       bundleContext_getBundle(info->context,&bundle);
+
+       bundle_getBundleId(bundle,&bundleId);
+
+       properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
+
+       /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
+       pubsubEndpoint_setFields(*psEp, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, 
topic_props, false);
+
+       free(topic);
+       free(scope);
+
+
+       return status;
+}
+
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+
+       if(psEp->frameworkUUID!=NULL){
+               free(psEp->frameworkUUID);
+               psEp->frameworkUUID = NULL;
+       }
+
+       if(psEp->scope!=NULL){
+               free(psEp->scope);
+               psEp->scope = NULL;
+       }
+
+       if(psEp->topic!=NULL){
+               free(psEp->topic);
+               psEp->topic = NULL;
+       }
+
+       if(psEp->endpoint!=NULL){
+               free(psEp->endpoint);
+               psEp->endpoint = NULL;
+       }
+
+       if(psEp->topic_props != NULL){
+               properties_destroy(psEp->topic_props);
+       }
+
+       free(psEp);
+
+       return CELIX_SUCCESS;
+
+}
+
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
+
+       return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
+                       (strcmp(psEp1->scope,psEp2->scope)==0) &&
+                       (strcmp(psEp1->topic,psEp2->topic)==0) &&
+                       (psEp1->serviceID == psEp2->serviceID) /*&&
+                       ((psEp1->endpoint==NULL && 
psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
+       );
+}
+
+char *createScopeTopicKey(const char* scope, const char* topic) {
+       char *result = NULL;
+       asprintf(&result, "%s:%s", scope, topic);
+
+       return result;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c 
b/pubsub/pubsub_spi/src/pubsub_utils.c
new file mode 100644
index 0000000..19b2271
--- /dev/null
+++ b/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -0,0 +1,170 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.c
+ *
+ *  \date       Sep 24, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "constants.h"
+
+#include "pubsub_common.h"
+#include "pubsub/publisher.h"
+#include "pubsub_utils.h"
+
+#include "array_list.h"
+#include "bundle.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define MAX_KEYBUNDLE_LENGTH 256
+
+char* pubsub_getScopeFromFilter(char* bundle_filter){
+
+       char* scope = NULL;
+
+       char* filter = strdup(bundle_filter);
+
+       char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+       if(oc!=NULL){
+               oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+               
if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+                       char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
+                       if(scopes!=NULL){
+
+                               scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
+                               char* bottom=strchr(scopes,')');
+                               *bottom='\0';
+
+                               scope=strdup(scopes);
+                       } else {
+                           scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+                       }
+               }
+       }
+
+       free(filter);
+
+       return scope;
+}
+
+char* pubsub_getTopicFromFilter(char* bundle_filter){
+
+       char* topic = NULL;
+
+       char* filter = strdup(bundle_filter);
+
+       char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+       if(oc!=NULL){
+               oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+               
if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+                       char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
+                       if(topics!=NULL){
+
+                               topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
+                               char* bottom=strchr(topics,')');
+                               *bottom='\0';
+
+                               topic=strdup(topics);
+
+                       }
+               }
+       }
+
+       free(filter);
+
+       return topic;
+
+}
+
+array_list_pt pubsub_getTopicsFromString(char* string){
+
+       array_list_pt topic_list = NULL;
+       arrayList_create(&topic_list);
+
+       char* topics = strdup(string);
+
+       char* topic = strtok(topics,",;|# ");
+       arrayList_add(topic_list,strdup(topic));
+
+       while( (topic = strtok(NULL,",;|# ")) !=NULL){
+               arrayList_add(topic_list,strdup(topic));
+       }
+
+       free(topics);
+
+       return topic_list;
+
+}
+
+/**
+ * Loop through all bundles and look for the bundle with the keys inside.
+ * If no key bundle found, return NULL
+ *
+ * Caller is responsible for freeing the object
+ */
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
+{
+       array_list_pt bundles = NULL;
+       bundleContext_getBundles(ctx, &bundles);
+       int nrOfBundles = arrayList_size(bundles);
+       long bundle_id = -1;
+       char* result = NULL;
+
+       for (int i = 0; i < nrOfBundles; i++){
+               bundle_pt b = arrayList_get(bundles, i);
+
+               /* Skip bundle 0 (framework bundle) since it has no path nor 
revisions */
+               bundle_getBundleId(b, &bundle_id);
+               if(bundle_id==0){
+                       continue;
+               }
+
+               char* dir = NULL;
+               bundle_getEntry(b, ".", &dir);
+
+               char cert_dir[MAX_KEYBUNDLE_LENGTH];
+               snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", 
dir);
+
+               struct stat s;
+               int err = stat(cert_dir, &s);
+               if (err != -1){
+                       if (S_ISDIR(s.st_mode)){
+                               result = dir;
+                               break;
+                       }
+               }
+
+               free(dir);
+       }
+
+       arrayList_destroy(bundles);
+
+       return result;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt 
b/pubsub/pubsub_topology_manager/CMakeLists.txt
index 784ca21..73b9ecb 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -15,22 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+add_bundle(celix_pubsub_topology_manager
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_topology_manager"
     VERSION "1.0.0"
     SOURCES
-       private/src/pstm_activator.c
-       private/src/pubsub_topology_manager.c
-       ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-       ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c    
+       src/pstm_activator.c
+       src/pubsub_topology_manager.c
+       src/pubsub_topology_manager.h
 )
+target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework 
Celix::log_helper Celix::pubsub_spi)
 
-bundle_files(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-   
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
+get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
+bundle_files(celix_pubsub_topology_manager
+       ${DESC}
     DESTINATION "META-INF/descriptors/services"
 )
-target_link_libraries(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 PRIVATE Celix::framework Celix::log_helper)
 
+install_bundle(celix_pubsub_topology_manager)
 
-install_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager)
+add_library(Celix::pubsub_topology_manager ALIAS celix_pubsub_topology_manager)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h 
b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
deleted file mode 100644
index 7614e0c..0000000
--- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_topology_manager.h
- *
- *  \date       Sep 29, 2011
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_TOPOLOGY_MANAGER_H_
-#define PUBSUB_TOPOLOGY_MANAGER_H_
-
-#include "endpoint_listener.h"
-#include "service_reference.h"
-#include "bundle_context.h"
-#include "log_helper.h"
-
-#include "pubsub_common.h"
-#include "pubsub_endpoint.h"
-#include "publisher.h"
-#include "subscriber.h"
-
-
-struct pubsub_topology_manager {
-       bundle_context_pt context;
-
-       celix_thread_mutex_t psaListLock;
-       array_list_pt psaList;
-
-       celix_thread_mutex_t discoveryListLock;
-       hash_map_pt discoveryList; //<serviceReference,NULL>
-
-       celix_thread_mutex_t publicationsLock;
-       hash_map_pt publications; //<topic(string),list<pubsub_ep>>
-
-       celix_thread_mutex_t subscriptionsLock;
-       hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
-
-       log_helper_pt loghelper;
-};
-
-typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_pt *manager);
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt 
manager);
-celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt 
manager);
-
-celix_status_t pubsub_topologyManager_psaAdded(void *handle, 
service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_psaModified(void *handle, 
service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_psaRemoved(void *handle, 
service_reference_pt reference, void *service);
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, 
service_reference_pt reference, void* service);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, 
service_reference_pt reference, void* service);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, 
service_reference_pt reference, void* service);
-
-celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, 
service_reference_pt reference, void * service);
-celix_status_t pubsub_topologyManager_subscriberModified(void * handle, 
service_reference_pt reference, void * service);
-celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, 
service_reference_pt reference, void * service);
-
-celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, 
array_list_pt listeners);
-celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, 
array_list_pt listeners);
-
-celix_status_t pubsub_topologyManager_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_topologyManager_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP);
-
-#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c 
b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
deleted file mode 100644
index 4d6dd27..0000000
--- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pstm_activator.c
- *
- *  \date       Sep 29, 2011
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "constants.h"
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "service_registration.h"
-
-#include "endpoint_listener.h"
-#include "remote_constants.h"
-#include "listener_hook_service.h"
-#include "log_service.h"
-#include "log_helper.h"
-
-
-#include "pubsub_topology_manager.h"
-#include "publisher_endpoint_announce.h"
-
-struct activator {
-       bundle_context_pt context;
-
-       pubsub_topology_manager_pt manager;
-
-       service_tracker_pt pubsubDiscoveryTracker;
-       service_tracker_pt pubsubAdminTracker;
-       service_tracker_pt pubsubSubscribersTracker;
-
-       listener_hook_service_pt hookService;
-       service_registration_pt hook;
-
-       publisher_endpoint_announce_pt publisherEPDiscover;
-       service_registration_pt publisherEPDiscoverService;
-
-       log_helper_pt loghelper;
-};
-
-
-static celix_status_t bundleActivator_createPSDTracker(struct activator 
*activator, service_tracker_pt *tracker);
-static celix_status_t bundleActivator_createPSATracker(struct activator 
*activator, service_tracker_pt *tracker);
-static celix_status_t bundleActivator_createPSSubTracker(struct activator 
*activator, service_tracker_pt *tracker);
-
-
-static celix_status_t bundleActivator_createPSDTracker(struct activator 
*activator, service_tracker_pt *tracker) {
-       celix_status_t status;
-
-       service_tracker_customizer_pt customizer = NULL;
-
-       status = serviceTrackerCustomizer_create(activator->manager,
-                       NULL,
-                       pubsub_topologyManager_pubsubDiscoveryAdded,
-                       pubsub_topologyManager_pubsubDiscoveryModified,
-                       pubsub_topologyManager_pubsubDiscoveryRemoved,
-                       &customizer);
-
-       if (status == CELIX_SUCCESS) {
-               status = serviceTracker_create(activator->context, (char *) 
PUBSUB_DISCOVERY_SERVICE, customizer, tracker);
-       }
-
-       return status;
-}
-
-static celix_status_t bundleActivator_createPSATracker(struct activator 
*activator, service_tracker_pt *tracker) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       service_tracker_customizer_pt customizer = NULL;
-
-       status = serviceTrackerCustomizer_create(activator->manager,
-                       NULL,
-                       pubsub_topologyManager_psaAdded,
-                       pubsub_topologyManager_psaModified,
-                       pubsub_topologyManager_psaRemoved,
-                       &customizer);
-
-       if (status == CELIX_SUCCESS) {
-               status = serviceTracker_create(activator->context, 
PUBSUB_ADMIN_SERVICE, customizer, tracker);
-       }
-
-       return status;
-}
-
-static celix_status_t bundleActivator_createPSSubTracker(struct activator 
*activator, service_tracker_pt *tracker) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       service_tracker_customizer_pt customizer = NULL;
-
-       status = serviceTrackerCustomizer_create(activator->manager,
-                       NULL,
-                       pubsub_topologyManager_subscriberAdded,
-                       pubsub_topologyManager_subscriberModified,
-                       pubsub_topologyManager_subscriberRemoved,
-                       &customizer);
-
-       if (status == CELIX_SUCCESS) {
-               status = serviceTracker_create(activator->context, 
PUBSUB_SUBSCRIBER_SERVICE_NAME, customizer, tracker);
-       }
-
-       return status;
-}
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = NULL;
-
-       activator = calloc(1,sizeof(struct activator));
-
-       if (!activator) {
-               return CELIX_ENOMEM;
-       }
-
-       activator->context = context;
-
-       logHelper_create(context, &activator->loghelper);
-       logHelper_start(activator->loghelper);
-
-       status = pubsub_topologyManager_create(context, activator->loghelper, 
&activator->manager);
-       if (status == CELIX_SUCCESS) {
-               status = bundleActivator_createPSDTracker(activator, 
&activator->pubsubDiscoveryTracker);
-               if (status == CELIX_SUCCESS) {
-                       status = bundleActivator_createPSATracker(activator, 
&activator->pubsubAdminTracker);
-                       if (status == CELIX_SUCCESS) {
-                               status = 
bundleActivator_createPSSubTracker(activator, 
&activator->pubsubSubscribersTracker);
-                               if (status == CELIX_SUCCESS) {
-                                       *userData = activator;
-                               }
-                       }
-               }
-       }
-
-       if(status != CELIX_SUCCESS){
-               bundleActivator_destroy(activator, context);
-       }
-
-       return status;
-}
-
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       publisher_endpoint_announce_pt pubEPDiscover = calloc(1, 
sizeof(*pubEPDiscover));
-       pubEPDiscover->handle = activator->manager;
-       pubEPDiscover->announcePublisher = 
pubsub_topologyManager_announcePublisher;
-       pubEPDiscover->removePublisher = pubsub_topologyManager_removePublisher;
-       activator->publisherEPDiscover = pubEPDiscover;
-
-       status += bundleContext_registerService(context, (char *) 
PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, pubEPDiscover, NULL, 
&activator->publisherEPDiscoverService);
-
-
-       listener_hook_service_pt hookService = calloc(1,sizeof(*hookService));
-       hookService->handle = activator->manager;
-       hookService->added = pubsub_topologyManager_publisherTrackerAdded;
-       hookService->removed = pubsub_topologyManager_publisherTrackerRemoved;
-       activator->hookService = hookService;
-
-       status += bundleContext_registerService(context, (char *) 
OSGI_FRAMEWORK_LISTENER_HOOK_SERVICE_NAME, hookService, NULL, &activator->hook);
-
-       /* NOTE: Enable those line in order to remotely expose the topic_info 
service
-       properties_pt props = properties_create();
-       properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, 
(char *) PUBSUB_TOPIC_INFO_SERVICE);
-       status += bundleContext_registerService(context, (char *) 
PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, 
&activator->topicInfoService);
-       */
-       status += serviceTracker_open(activator->pubsubAdminTracker);
-
-       status += serviceTracker_open(activator->pubsubDiscoveryTracker);
-
-       status += serviceTracker_open(activator->pubsubSubscribersTracker);
-
-
-       return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       serviceTracker_close(activator->pubsubSubscribersTracker);
-       serviceTracker_close(activator->pubsubDiscoveryTracker);
-       serviceTracker_close(activator->pubsubAdminTracker);
-
-       serviceRegistration_unregister(activator->publisherEPDiscoverService);
-       free(activator->publisherEPDiscover);
-
-       serviceRegistration_unregister(activator->hook);
-       free(activator->hookService);
-
-       return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       struct activator *activator = userData;
-       if (activator == NULL) {
-               status = CELIX_BUNDLE_EXCEPTION;
-       } else {
-
-               if(activator->pubsubSubscribersTracker!=NULL){
-                       
serviceTracker_destroy(activator->pubsubSubscribersTracker);
-               }
-               if(activator->pubsubDiscoveryTracker!=NULL){
-                       
serviceTracker_destroy(activator->pubsubDiscoveryTracker);
-               }
-               if(activator->pubsubAdminTracker!=NULL){
-                       serviceTracker_destroy(activator->pubsubAdminTracker);
-               }
-
-               if(activator->manager!=NULL){
-                       status = 
pubsub_topologyManager_destroy(activator->manager);
-               }
-
-               logHelper_stop(activator->loghelper);
-               logHelper_destroy(&activator->loghelper);
-
-               free(activator);
-       }
-
-       return status;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c 
b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
deleted file mode 100644
index 987d864..0000000
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ /dev/null
@@ -1,723 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_topology_manager.c
- *
- *  \date       Sep 29, 2011
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdbool.h>
-
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "constants.h"
-#include "module.h"
-#include "bundle.h"
-#include "remote_service_admin.h"
-#include "remote_constants.h"
-#include "filter.h"
-#include "listener_hook_service.h"
-#include "utils.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_service.h"
-#include "log_helper.h"
-
-#include "publisher_endpoint_announce.h"
-#include "pubsub_topology_manager.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_admin.h"
-#include "pubsub_utils.h"
-
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       *manager = calloc(1, sizeof(**manager));
-       if (!*manager) {
-               return CELIX_ENOMEM;
-       }
-
-       (*manager)->context = context;
-
-       celix_thread_mutexattr_t psaAttr;
-       celixThreadMutexAttr_create(&psaAttr);
-       celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-       status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
-       celixThreadMutexAttr_destroy(&psaAttr);
-
-       status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
-       status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
-       status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
-
-       arrayList_create(&(*manager)->psaList);
-
-       (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, 
NULL, serviceReference_equals2, NULL);
-       (*manager)->publications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-       (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-
-       (*manager)->loghelper = logHelper;
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt 
manager) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&manager->discoveryListLock);
-       hashMap_destroy(manager->discoveryList, false, false);
-       celixThreadMutex_unlock(&manager->discoveryListLock);
-       celixThreadMutex_destroy(&manager->discoveryListLock);
-
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_destroy(manager->psaList);
-       celixThreadMutex_unlock(&manager->psaListLock);
-       celixThreadMutex_destroy(&manager->psaListLock);
-
-       celixThreadMutex_lock(&manager->publicationsLock);
-       hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
-       while(hashMapIterator_hasNext(pubit)){
-               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(pubit);
-               int i;
-               for(i=0;i<arrayList_size(l);i++){
-                       
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-               }
-               arrayList_destroy(l);
-       }
-       hashMapIterator_destroy(pubit);
-       hashMap_destroy(manager->publications, true, false);
-       celixThreadMutex_unlock(&manager->publicationsLock);
-       celixThreadMutex_destroy(&manager->publicationsLock);
-
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
-       while(hashMapIterator_hasNext(subit)){
-               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(subit);
-               int i;
-               for(i=0;i<arrayList_size(l);i++){
-                       
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-               }
-               arrayList_destroy(l);
-       }
-       hashMapIterator_destroy(subit);
-       hashMap_destroy(manager->subscriptions, true, false);
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
-       celixThreadMutex_destroy(&manager->subscriptionsLock);
-
-       free(manager);
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_psaAdded(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-       int i;
-
-       pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
-       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
PSA");
-
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_add(manager->psaList, psa);
-       celixThreadMutex_unlock(&manager->psaListLock);
-
-       // Add already detected subscriptions to new PSA
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       hash_map_iterator_pt subscriptionsIterator = 
hashMapIterator_create(manager->subscriptions);
-
-       while (hashMapIterator_hasNext(subscriptionsIterator)) {
-               array_list_pt sub_ep_list = 
hashMapIterator_nextValue(subscriptionsIterator);
-               for(i=0;i<arrayList_size(sub_ep_list);i++){
-                       status += psa->addSubscription(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
-               }
-       }
-
-       hashMapIterator_destroy(subscriptionsIterator);
-
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-       // Add already detected publications to new PSA
-       status = celixThreadMutex_lock(&manager->publicationsLock);
-       hash_map_iterator_pt publicationsIterator = 
hashMapIterator_create(manager->publications);
-
-       while (hashMapIterator_hasNext(publicationsIterator)) {
-               array_list_pt pub_ep_list = 
hashMapIterator_nextValue(publicationsIterator);
-               for(i=0;i<arrayList_size(pub_ep_list);i++){
-                       status += psa->addPublication(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
-               }
-       }
-
-       hashMapIterator_destroy(publicationsIterator);
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_psaModified(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       // Nop...
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_psaRemoved(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
-
-       /* Deactivate all publications */
-       celixThreadMutex_lock(&manager->publicationsLock);
-
-       hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
-       while(hashMapIterator_hasNext(pubit)){
-               hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
-               char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
-               // Extract scope/topic name from key
-               char scope[MAX_SCOPE_LEN];
-               char topic[MAX_TOPIC_LEN];
-               sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
-               array_list_pt pubEP_list = 
(array_list_pt)hashMapEntry_getValue(pub_entry);
-
-               status = psa->closeAllPublications(psa->admin,scope,topic);
-
-               if(status==CELIX_SUCCESS){
-                       celixThreadMutex_lock(&manager->discoveryListLock);
-                       hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-                       while(hashMapIterator_hasNext(iter)){
-                               service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                               publisher_endpoint_announce_pt disc = NULL;
-                               bundleContext_getService(manager->context, 
disc_sr, (void**) &disc);
-                               const char* fwUUID = NULL;
-                               
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-                               int i;
-                               for(i=0;i<arrayList_size(pubEP_list);i++){
-                                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                                       
if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
-                                               
disc->removePublisher(disc->handle,pubEP);
-                                       }
-                               }
-                               bundleContext_ungetService(manager->context, 
disc_sr, NULL);
-                       }
-                       hashMapIterator_destroy(iter);
-                       celixThreadMutex_unlock(&manager->discoveryListLock);
-               }
-       }
-       hashMapIterator_destroy(pubit);
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
-
-       /* Deactivate all subscriptions */
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
-       while(hashMapIterator_hasNext(subit)){
-               // TODO do some error checking
-               char* scope_topic = (char*)hashMapIterator_nextKey(subit);
-               char scope[MAX_TOPIC_LEN];
-               char topic[MAX_TOPIC_LEN];
-               memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
-               memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
-               sscanf(scope_topic, "%[^:]:%s", scope, topic );
-               status += psa->closeAllSubscriptions(psa->admin,scope, topic);
-       }
-       hashMapIterator_destroy(subit);
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_removeElement(manager->psaList, psa);
-       celixThreadMutex_unlock(&manager->psaListLock);
-
-       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed 
PSA");
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-       //subscriber_service_pt subscriber = (subscriber_service_pt)service;
-
-       pubsub_endpoint_pt sub = NULL;
-       if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == 
CELIX_SUCCESS){
-               celixThreadMutex_lock(&manager->subscriptionsLock);
-               char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
-
-               array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
-               if(sub_list_by_topic==NULL){
-                       arrayList_create(&sub_list_by_topic);
-                       
hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
-               }
-               free(sub_key);
-               arrayList_add(sub_list_by_topic,sub);
-
-               celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-               int j;
-               double score = 0;
-               double best_score = 0;
-               pubsub_admin_service_pt best_psa = NULL;
-               celixThreadMutex_lock(&manager->psaListLock);
-               for(j=0;j<arrayList_size(manager->psaList);j++){
-                       pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                       psa->matchEndpoint(psa->admin,sub,&score);
-                       if(score>best_score){ /* We have a new winner! */
-                               best_score = score;
-                               best_psa = psa;
-                       }
-               }
-
-               if(best_psa != NULL && best_score>0){
-                       best_psa->addSubscription(best_psa->admin,sub);
-               }
-
-               // Inform discoveries for interest in the topic
-               celixThreadMutex_lock(&manager->discoveryListLock);
-               hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-               while(hashMapIterator_hasNext(iter)){
-                       service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                       publisher_endpoint_announce_pt disc = NULL;
-                       bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->interestedInTopic(disc->handle, sub->scope, 
sub->topic);
-                       bundleContext_ungetService(manager->context, disc_sr, 
NULL);
-               }
-               hashMapIterator_destroy(iter);
-               celixThreadMutex_unlock(&manager->discoveryListLock);
-
-               celixThreadMutex_unlock(&manager->psaListLock);
-       }
-       else{
-               status=CELIX_INVALID_BUNDLE_CONTEXT;
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberModified(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       // Nop...
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       pubsub_endpoint_pt subcmp = NULL;
-       if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) 
== CELIX_SUCCESS){
-
-               int j,k;
-
-               // Inform discoveries that we not interested in the topic any 
more
-               celixThreadMutex_lock(&manager->discoveryListLock);
-               hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-               while(hashMapIterator_hasNext(iter)){
-                       service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                       publisher_endpoint_announce_pt disc = NULL;
-                       bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->uninterestedInTopic(disc->handle, subcmp->scope, 
subcmp->topic);
-                       bundleContext_ungetService(manager->context, disc_sr, 
NULL);
-               }
-               hashMapIterator_destroy(iter);
-               celixThreadMutex_unlock(&manager->discoveryListLock);
-
-               celixThreadMutex_lock(&manager->subscriptionsLock);
-               celixThreadMutex_lock(&manager->psaListLock);
-
-               char *sub_key = 
createScopeTopicKey(subcmp->scope,subcmp->topic);
-               array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
-               free(sub_key);
-               if(sub_list_by_topic!=NULL){
-                       for(j=0;j<arrayList_size(sub_list_by_topic);j++){
-                               pubsub_endpoint_pt sub = 
arrayList_get(sub_list_by_topic,j);
-                               if(pubsubEndpoint_equals(sub,subcmp)){
-                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                               /* No problem with invoking 
removal on all psa's, only the one that manage this topic will do something */
-                                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->removeSubscription(psa->admin,sub);
-                                       }
-
-                               }
-                               arrayList_remove(sub_list_by_topic,j);
-
-                               /* If it was the last subscriber for this 
topic, tell PSA to close the ZMQ socket */
-                               if(arrayList_size(sub_list_by_topic)==0){
-                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
-                                       }
-                               }
-
-                               pubsubEndpoint_destroy(sub);
-
-                       }
-               }
-
-               celixThreadMutex_unlock(&manager->psaListLock);
-               celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-               pubsubEndpoint_destroy(subcmp);
-
-       }
-       else{
-               status=CELIX_INVALID_BUNDLE_CONTEXT;
-       }
-
-       return status;
-
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, 
service_reference_pt reference, void* service) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
-       publisher_endpoint_announce_pt disc = 
(publisher_endpoint_announce_pt)service;
-
-       const char* fwUUID = NULL;
-
-       
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-       if(fwUUID==NULL){
-               printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
-               return CELIX_INVALID_BUNDLE_CONTEXT;
-       }
-
-       celixThreadMutex_lock(&manager->publicationsLock);
-
-       celixThreadMutex_lock(&manager->discoveryListLock);
-       hashMap_put(manager->discoveryList, reference, NULL);
-       celixThreadMutex_unlock(&manager->discoveryListLock);
-
-       hash_map_iterator_pt iter = 
hashMapIterator_create(manager->publications);
-       while(hashMapIterator_hasNext(iter)){
-               array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               for(int i = 0; i < arrayList_size(pubEP_list); i++) {
-                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && 
(pubEP->endpoint!=NULL)){
-                               status += 
disc->announcePublisher(disc->handle,pubEP);
-                       }
-               }
-       }
-       hashMapIterator_destroy(iter);
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
-
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       iter = hashMapIterator_create(manager->subscriptions);
-
-       while(hashMapIterator_hasNext(iter)) {
-               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               int i;
-               for(i=0;i<arrayList_size(l);i++){
-                       pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
-
-                       disc->interestedInTopic(disc->handle, subEp->scope, 
subEp->topic);
-               }
-       }
-       hashMapIterator_destroy(iter);
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, 
reference, service);
-       if (status == CELIX_SUCCESS) {
-               status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, 
reference, service);
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, 
service_reference_pt reference, void * service) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_topology_manager_pt manager = handle;
-
-       celixThreadMutex_lock(&manager->discoveryListLock);
-
-
-       if (hashMap_remove(manager->discoveryList, reference)) {
-               logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, 
"EndpointListener Removed");
-       }
-
-       celixThreadMutex_unlock(&manager->discoveryListLock);
-
-       return status;
-}
-
-
-celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, 
array_list_pt listeners) {
-
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       int l_index;
-
-       for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
-
-               listener_hook_info_pt info = arrayList_get(listeners, l_index);
-
-               pubsub_endpoint_pt pub = NULL;
-               if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) 
== CELIX_SUCCESS){
-
-                       celixThreadMutex_lock(&manager->publicationsLock);
-                       char *pub_key = createScopeTopicKey(pub->scope, 
pub->topic);
-                       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
-                       if(pub_list_by_topic==NULL){
-                               arrayList_create(&pub_list_by_topic);
-                               
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
-                       }
-                       free(pub_key);
-                       arrayList_add(pub_list_by_topic,pub);
-
-                       celixThreadMutex_unlock(&manager->publicationsLock);
-
-                       int j;
-                       double score = 0;
-                       double best_score = 0;
-                       pubsub_admin_service_pt best_psa = NULL;
-                       celixThreadMutex_lock(&manager->psaListLock);
-
-                       for(j=0;j<arrayList_size(manager->psaList);j++){
-                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                               psa->matchEndpoint(psa->admin,pub,&score);
-                               if(score>best_score){ /* We have a new winner! 
*/
-                                       best_score = score;
-                                       best_psa = psa;
-                               }
-                       }
-
-                       if(best_psa != NULL && best_score>0){
-                               status = 
best_psa->addPublication(best_psa->admin,pub);
-                               if(status==CELIX_SUCCESS){
-                                       
celixThreadMutex_lock(&manager->discoveryListLock);
-                                       hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-                                       while(hashMapIterator_hasNext(iter)){
-                                               service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                                               publisher_endpoint_announce_pt 
disc = NULL;
-                                               
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-                                               
disc->announcePublisher(disc->handle,pub);
-                                               
bundleContext_ungetService(manager->context, disc_sr, NULL);
-                                       }
-                                       hashMapIterator_destroy(iter);
-                                       
celixThreadMutex_unlock(&manager->discoveryListLock);
-                               }
-                       }
-
-                       celixThreadMutex_unlock(&manager->psaListLock);
-
-               }
-
-       }
-
-       return status;
-
-}
-
-
-celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, 
array_list_pt listeners) {
-       celix_status_t status = CELIX_SUCCESS;
-       pubsub_topology_manager_pt manager = handle;
-
-       int l_index;
-
-       for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
-
-               listener_hook_info_pt info = arrayList_get(listeners, l_index);
-
-               pubsub_endpoint_pt pubcmp = NULL;
-               if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) 
== CELIX_SUCCESS){
-
-
-                       int j,k;
-                       celixThreadMutex_lock(&manager->psaListLock);
-                       celixThreadMutex_lock(&manager->publicationsLock);
-
-                       char *pub_key = createScopeTopicKey(pubcmp->scope, 
pubcmp->topic);
-                       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-                       if(pub_list_by_topic!=NULL){
-                               
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
-                                       pubsub_endpoint_pt pub = 
arrayList_get(pub_list_by_topic,j);
-                                       if(pubsubEndpoint_equals(pub,pubcmp)){
-                                               
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                                       pubsub_admin_service_pt 
psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                       status = 
psa->removePublication(psa->admin,pub);
-                                                       
if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
-                                                               
celixThreadMutex_lock(&manager->discoveryListLock);
-                                                               
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-                                                               
while(hashMapIterator_hasNext(iter)){
-                                                                       
service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                                                                       
publisher_endpoint_announce_pt disc = NULL;
-                                                                       
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-                                                                       
disc->removePublisher(disc->handle,pub);
-                                                                       
bundleContext_ungetService(manager->context, disc_sr, NULL);
-                                                               }
-                                                               
hashMapIterator_destroy(iter);
-                                                               
celixThreadMutex_unlock(&manager->discoveryListLock);
-                                                       }
-                                                       else if(status ==  
CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not 
handle this endpoint */
-                                                               status = 
CELIX_SUCCESS;
-                                                       }
-                                               }
-                                               //}
-                                               
arrayList_remove(pub_list_by_topic,j);
-
-                                               /* If it was the last publisher 
for this topic, tell PSA to close the ZMQ socket and then inform the discovery 
*/
-                                               
if(arrayList_size(pub_list_by_topic)==0){
-                                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                                               
pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                               
psa->closeAllPublications(psa->admin,pub->scope, pub->topic);
-                                                       }
-                                               }
-
-                                               pubsubEndpoint_destroy(pub);
-                                       }
-
-                               }
-                       }
-
-                       celixThreadMutex_unlock(&manager->publicationsLock);
-                       celixThreadMutex_unlock(&manager->psaListLock);
-
-                       free(pub_key);
-
-                       pubsubEndpoint_destroy(pubcmp);
-
-               }
-
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
-       celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
-
-       pubsub_topology_manager_pt manager = handle;
-       celixThreadMutex_lock(&manager->psaListLock);
-       celixThreadMutex_lock(&manager->publicationsLock);
-
-       char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
-
-       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-       if(pub_list_by_topic==NULL){
-               arrayList_create(&pub_list_by_topic);
-               
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
-       }
-       free(pub_key);
-
-       /* Shouldn't be any other duplicate, since it's filtered out by the 
discovery */
-       pubsub_endpoint_pt p = NULL;
-       pubsubEndpoint_clone(pubEP, &p);
-       arrayList_add(pub_list_by_topic,p);
-
-       int j;
-       double score = 0;
-       double best_score = 0;
-       pubsub_admin_service_pt best_psa = NULL;
-
-       for(j=0;j<arrayList_size(manager->psaList);j++){
-               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-               psa->matchEndpoint(psa->admin,p,&score);
-               if(score>best_score){ /* We have a new winner! */
-                       best_score = score;
-                       best_psa = psa;
-               }
-       }
-
-       if(best_psa != NULL && best_score>0){
-               best_psa->addPublication(best_psa->admin,p);
-       }
-       else{
-               status = CELIX_ILLEGAL_STATE;
-       }
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
-       celixThreadMutex_unlock(&manager->psaListLock);
-
-       return status;
-}
-
-celix_status_t pubsub_topologyManager_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
-       celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: Publisher removed for topic %s [fwUUID=%s, 
ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
-
-       pubsub_topology_manager_pt manager = handle;
-       celixThreadMutex_lock(&manager->psaListLock);
-       celixThreadMutex_lock(&manager->publicationsLock);
-       int i;
-
-       char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
-       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-       if(pub_list_by_topic==NULL){
-               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint);
-               status = CELIX_ILLEGAL_STATE;
-       }
-       else{
-
-               pubsub_endpoint_pt p = NULL;
-               bool found = false;
-
-               for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
-                       p = 
(pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
-                       found = pubsubEndpoint_equals(p,pubEP);
-               }
-
-               if(found && p !=NULL){
-
-                       for(i=0;i<arrayList_size(manager->psaList);i++){
-                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                               /* No problem with invoking removal on all 
psa's, only the one that manage this topic will do something */
-                               psa->removePublication(psa->admin,p);
-                       }
-
-                       arrayList_removeElement(pub_list_by_topic,p);
-
-                       /* If it was the last publisher for this topic, tell 
PSA to close the ZMQ socket */
-                       if(arrayList_size(pub_list_by_topic)==0){
-
-                               for(i=0;i<arrayList_size(manager->psaList);i++){
-                                       pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                                       
psa->closeAllPublications(psa->admin,p->scope, p->topic);
-                               }
-                       }
-
-                       pubsubEndpoint_destroy(p);
-               }
-
-
-       }
-       free(pub_key);
-       celixThreadMutex_unlock(&manager->publicationsLock);
-       celixThreadMutex_unlock(&manager->psaListLock);
-
-
-       return status;
-}
-

Reply via email to