This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/main by this push:
     new 05ad6c1  [KARAF-7170] Allow to define event.topics property in all 
collectors
     new 6d38c21  Merge pull request #267 from jbonofre/KARAF-7170
05ad6c1 is described below

commit 05ad6c12a4137cc3cc40d35bfecfb78447cd1345
Author: Jean-Baptiste Onofré <jbono...@apache.org>
AuthorDate: Wed Sep 8 16:50:57 2021 +0200

    [KARAF-7170] Allow to define event.topics property in all collectors
---
 .../collector/camel/DecanterEventNotifier.java     | 21 ++++++++++++-----
 .../collector/camel/DecanterInterceptStrategy.java | 11 ++++++++-
 .../collector/camel/DecanterEventNotifierTest.java | 11 ++++-----
 .../configadmin/ConfigAdminCollector.java          | 11 +++++----
 collector/dropwizard/pom.xml                       |  4 ++++
 .../dropwizard/DecanterReporterCollector.java      | 27 +++++++++++++++++++++-
 .../decanter/collector/druid/DruidCollector.java   |  3 ++-
 .../elasticsearch/ElasticsearchCollector.java      | 21 +++++++++--------
 .../collector/eventadmin/EventCollector.java       |  7 +++---
 .../collector/file/DecanterTailerListener.java     |  6 ++++-
 .../decanter/collector/jdbc/JdbcCollector.java     | 10 ++++----
 .../decanter/collector/jdbc/TestJdbcCollector.java |  2 +-
 .../jetty/DecanterCollectorJettyHandler.java       |  4 +++-
 .../karaf/decanter/collector/jmx/JmxCollector.java |  7 ++++--
 .../karaf/decanter/collector/log/LogCollector.java |  5 ++--
 .../collector/log/socket/SocketCollector.java      |  9 ++++----
 .../collector/log/socket/SocketCollectorTest.java  |  2 +-
 .../collector/openstack/OpenstackCollector.java    |  7 ++----
 .../decanter/collector/oshi/OshiCollector.java     |  5 +++-
 .../collector/prometheus/PrometheusCollector.java  |  4 +++-
 .../decanter/collector/redis/RedisCollector.java   |  4 +++-
 .../rest/servlet/RestServletCollector.java         |  7 +++---
 .../decanter/collector/rest/RestCollector.java     |  3 ++-
 .../karaf/decanter/collector/snmp/SnmpPoller.java  |  4 +++-
 .../decanter/collector/soap/SoapCollector.java     |  6 ++---
 .../decanter/collector/socket/SocketCollector.java |  8 +++----
 .../decanter/collector/system/SystemCollector.java |  3 ++-
 .../itests/collector/CamelCollectorTest.java       |  3 +--
 .../src/main/asciidoc/user-guide/collectors.adoc   | 12 ++++++++++
 .../passthrough/PassThroughProcessor.java          |  5 +---
 30 files changed, 156 insertions(+), 76 deletions(-)

diff --git 
a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
 
b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
index 4aa8eea..7f0b7a7 100644
--- 
a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
+++ 
b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
@@ -32,18 +32,27 @@ public class DecanterEventNotifier extends 
EventNotifierSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DecanterEventNotifier.class.getName());
 
-    private EventAdmin eventAdmin;
+    private EventAdmin dispatcher;
+    private String topic = "decanter/collect/camel/event";
     private String camelContextMatcher = ".*";
     private String routeMatcher = ".*";
     private DefaultExchangeExtender dextender = new DefaultExchangeExtender();
     private DecanterCamelEventExtender extender;
 
-    public EventAdmin getEventAdmin() {
-        return eventAdmin;
+    public EventAdmin getDispatcher() {
+        return dispatcher;
     }
 
-    public void setEventAdmin(EventAdmin eventAdmin) {
-        this.eventAdmin = eventAdmin;
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
     }
 
     public void setCamelContextMatcher(String camelContextMatcher) {
@@ -151,7 +160,7 @@ public class DecanterEventNotifier extends 
EventNotifierSupport {
                         extender.extend(eventMap, (Exchange) source);
                     }
                 }
-                eventAdmin.postEvent(new Event("decanter/collect/camel/event", 
eventMap));
+                dispatcher.postEvent(new Event(topic, eventMap));
             } catch (Exception ex) {
                 LOG.warn("Failed to handle event", ex);
             }
diff --git 
a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
 
b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
index 4b7cad8..5ff6bae 100644
--- 
a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
+++ 
b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
@@ -31,6 +31,7 @@ import org.osgi.service.event.EventAdmin;
 public class DecanterInterceptStrategy implements InterceptStrategy {
 
     private EventAdmin dispatcher;
+    private String topic = "decanter/collect/camel/tracer";
     private DefaultExchangeExtender dextender = new DefaultExchangeExtender();
     private DecanterCamelEventExtender extender;
 
@@ -72,7 +73,7 @@ public class DecanterInterceptStrategy implements 
InterceptStrategy {
                 data.put(header.substring("decanter.".length()), 
exchange.getIn().getHeader(header));
             }
         }
-        Event event = new Event("decanter/collect/camel/tracer", data);
+        Event event = new Event(topic, data);
         dispatcher.postEvent(event);
     }
     
@@ -87,6 +88,14 @@ public class DecanterInterceptStrategy implements 
InterceptStrategy {
     public void setDispatcher(EventAdmin dispatcher) {
         this.dispatcher = dispatcher;
     }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
     
     public void setIncludeBody(boolean includeBody) {
         dextender.setIncludeBody(includeBody);
diff --git 
a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
 
b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
index 83cb37f..036f9d3 100644
--- 
a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
+++ 
b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
@@ -20,7 +20,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.spi.CamelEvent;
 import org.junit.Assert;
 import org.junit.Test;
 import org.osgi.service.event.Event;
@@ -33,7 +32,7 @@ public class DecanterEventNotifierTest {
     public void testEventNotifier() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
 
@@ -73,7 +72,7 @@ public class DecanterEventNotifierTest {
     public void testCamelContextFilter() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setCamelContextMatcher("foo");
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -88,7 +87,7 @@ public class DecanterEventNotifierTest {
     public void testRouteIdFilter() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setCamelContextMatcher(".*");
         notifier.setRouteMatcher("foo");
 
@@ -104,7 +103,7 @@ public class DecanterEventNotifierTest {
     public void testIgnoredEvents() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setIgnoreCamelContextEvents(true);
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -121,7 +120,7 @@ public class DecanterEventNotifierTest {
         DecanterEventNotifier notifier = new DecanterEventNotifier();
         notifier.setIgnoreCamelContextEvents(true);
         notifier.setIgnoreRouteEvents(true);
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setExtender(new TestExtender());
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
diff --git 
a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
 
b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
index 9757e0d..988bc25 100644
--- 
a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
+++ 
b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
@@ -27,6 +27,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class ConfigAdminCollector implements 
ConfigurationListener {
     @Reference
     private ConfigurationAdmin configurationAdmin;
 
-    private Dictionary<String, Object> properties;
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext componentContext) {
@@ -57,7 +58,7 @@ public class ConfigAdminCollector implements 
ConfigurationListener {
     }
 
     public void activate(Dictionary<String, Object> properties) {
-        this.properties = properties;
+        this.config = properties;
     }
 
     @Override
@@ -94,12 +95,14 @@ public class ConfigAdminCollector implements 
ConfigurationListener {
         }
 
         try {
-            PropertiesPreparator.prepare(data, properties);
+            PropertiesPreparator.prepare(data, config);
         } catch (Exception e) {
             // nothing to do
         }
 
-        dispatcher.postEvent(new Event("decanter/collect/configadmin", data));
+        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) config.get(EventConstants.EVENT_TOPIC) : 
"decanter/collect/configadmin";
+
+        dispatcher.postEvent(new Event(topic, data));
     }
 
 }
diff --git a/collector/dropwizard/pom.xml b/collector/dropwizard/pom.xml
index d922421..56d7ff7 100644
--- a/collector/dropwizard/pom.xml
+++ b/collector/dropwizard/pom.xml
@@ -35,6 +35,10 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.karaf.decanter.collector</groupId>
+            <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-core</artifactId>
             <version>4.2.3</version>
diff --git 
a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
 
b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
index c3b175b..571a9c0 100644
--- 
a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
+++ 
b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
@@ -19,13 +19,18 @@
 package org.apache.karaf.decanter.collector.dropwizard;
 
 import com.codahale.metrics.*;
+import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 
 import java.net.InetAddress;
+import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -45,6 +50,17 @@ public class DecanterReporterCollector implements Runnable {
     @Reference(cardinality = ReferenceCardinality.OPTIONAL)
     public MetricSet metricRegistry;
 
+    public Dictionary<String, Object> config;
+
+    @Activate
+    public void activate(ComponentContext componentContext) {
+        activate(componentContext.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> config) {
+        this.config = config;
+    }
+
     @Override
     public void run() {
         Map<String, Metric> metrics = metricRegistry.getMetrics();
@@ -95,7 +111,16 @@ public class DecanterReporterCollector implements Runnable {
                 data.put("Mean Rate", timer.getMeanRate());
                 populateSnapshot(timer.getSnapshot(), data);
             }
-            Event event = new Event("decanter/collect/dropwizard", data);
+
+            try {
+                PropertiesPreparator.prepare(data, config);
+            } catch (Exception e) {
+                // nothing to do
+            }
+
+            String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/dropwizard";
+
+            Event event = new Event(topic, data);
             dispatcher.postEvent(event);
         }
     }
diff --git 
a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
 
b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
index 5122bfb..c3a0f08 100644
--- 
a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
+++ 
b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
@@ -24,6 +24,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +81,7 @@ public class DruidCollector implements Runnable {
                     data.put("query", key.substring("query.".length()));
                     data.putAll(executeQuery(druidBroker, (String) 
config.get(key)));
                     PropertiesPreparator.prepare(data, config);
-                    String topic = (config.get("topic") != null) ? (String) 
config.get("topic") : "decanter/collect/druid";
+                    String topic = (config.get(EventConstants.EVENT_TOPIC) != 
null) ? (String) config.get(EventConstants.EVENT_TOPIC) : 
"decanter/collect/druid";
                     dispatcher.postEvent(new Event(topic, data));
                 } catch (Exception e) {
                     LOGGER.warn("Can't execute query {}", 
key.substring("query.".length()), e);
diff --git 
a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
 
b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
index 03cad36..8ef9176 100644
--- 
a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
+++ 
b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
@@ -43,6 +43,7 @@ import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,7 @@ public class ElasticsearchCollector implements Runnable {
     @Reference
     private EventAdmin dispatcher;
 
-    private Dictionary<String, Object> configuration;
+    private Dictionary<String, Object> config;
     private RestHighLevelClient restClient;
 
     @Activate
@@ -74,7 +75,7 @@ public class ElasticsearchCollector implements Runnable {
     }
 
     public void activate(Dictionary<String, Object> configuration) {
-        this.configuration = configuration;
+        this.config = configuration;
         String addressesString = (configuration.get("addresses") != null) ? 
configuration.get("addresses").toString() : "http://localhost:9200";;
         String username = (configuration.get("username") != null) ? 
configuration.get("username").toString() : null;
         String password = (configuration.get("password") != null) ? 
configuration.get("password").toString() : null;
@@ -126,12 +127,12 @@ public class ElasticsearchCollector implements Runnable {
     public void run() {
         SearchRequest searchRequest = new SearchRequest();
 
-        String index = (configuration.get("index") != null) ? 
configuration.get("index").toString() : "decanter";
+        String index = (config.get("index") != null) ? 
config.get("index").toString() : "decanter";
         searchRequest.indices(index);
 
         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 
-        String query = (configuration.get("query") != null) ? 
configuration.get("query").toString() : null;
+        String query = (config.get("query") != null) ? 
config.get("query").toString() : null;
         QueryBuilder queryBuilder;
         if (query == null) {
             queryBuilder = QueryBuilders.matchAllQuery();
@@ -139,17 +140,17 @@ public class ElasticsearchCollector implements Runnable {
             queryBuilder = QueryBuilders.queryStringQuery(query);
         }
         searchSourceBuilder.query(queryBuilder);
-        String fromString = (configuration.get("from") != null) ? 
configuration.get("from").toString() : null;
+        String fromString = (config.get("from") != null) ? 
config.get("from").toString() : null;
         if (fromString != null) {
             int from = Integer.parseInt(fromString);
             searchSourceBuilder.from(from);
         }
-        String sizeString = (configuration.get("size") != null) ? 
configuration.get("size").toString() : null;
+        String sizeString = (config.get("size") != null) ? 
config.get("size").toString() : null;
         if (sizeString != null) {
             int size = Integer.parseInt(sizeString);
             searchSourceBuilder.size(size);
         }
-        String timeoutString = (configuration.get("timeout") != null) ? 
configuration.get("timeout").toString() : null;
+        String timeoutString = (config.get("timeout") != null) ? 
config.get("timeout").toString() : null;
         if (timeoutString != null) {
             int timeout = Integer.parseInt(timeoutString);
             searchSourceBuilder.timeout(new TimeValue(timeout, 
TimeUnit.SECONDS));
@@ -176,12 +177,14 @@ public class ElasticsearchCollector implements Runnable {
             LOGGER.error("Can't query elasticsearch", e);
         }
         try {
-            PropertiesPreparator.prepare(data, configuration);
+            PropertiesPreparator.prepare(data, config);
         } catch (Exception e) {
             LOGGER.warn("Can't prepare event", e);
         }
 
-        dispatcher.postEvent(new Event("decanter/collect/elasticsearch", 
data));
+        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) config.get(EventConstants.EVENT_TOPIC) : 
"decanter/collect/elasticsearch";
+
+        dispatcher.postEvent(new Event(topic, data));
     }
 
     /**
diff --git 
a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
 
b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
index 44d79b0..b29aa33 100644
--- 
a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
+++ 
b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
@@ -23,6 +23,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.osgi.service.event.EventHandler;
 
 import javax.security.auth.Subject;
@@ -38,11 +39,11 @@ public class EventCollector implements EventHandler {
     @Reference
     public EventAdmin dispatcher;
 
-    private Dictionary<String, Object> properties;
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext context) {
-        properties = context.getProperties();
+        config = context.getProperties();
     }
 
     @Override
@@ -68,7 +69,7 @@ public class EventCollector implements EventHandler {
         }
 
         try {
-            PropertiesPreparator.prepare(data, properties);
+            PropertiesPreparator.prepare(data, config);
         } catch (Exception e) {
             // nothing to do
         }
diff --git 
a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
 
b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
index 8f9460d..8cbc143 100644
--- 
a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
+++ 
b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
@@ -36,6 +36,7 @@ import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,8 @@ public class DecanterTailerListener extends 
TailerListenerAdapter {
     private String path;
     private String regex;
     private Pattern compiledRegex;
+
+    private String topic;
     
     /**
      * additional properties provided by the user
@@ -88,6 +91,7 @@ public class DecanterTailerListener extends 
TailerListenerAdapter {
         if (regex != null) {
             compiledRegex  = Pattern.compile(regex);
         }
+        topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/file/";
     }
     
     @Deactivate
@@ -120,7 +124,7 @@ public class DecanterTailerListener extends 
TailerListenerAdapter {
             LOGGER.warn("Can't fully prepare data for the dispatcher", e);
         }
 
-        Event event = new Event("decanter/collect/file/" + type, data);
+        Event event = new Event(topic + type, data);
         dispatcher.postEvent(event);
     }
 
diff --git 
a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
 
b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
index 04dbd0c..ef9d4e7 100644
--- 
a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
+++ 
b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
@@ -58,7 +58,7 @@ public class JdbcCollector implements Runnable {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(JdbcCollector.class);
 
     private String query;
-    private String dispatcherTopic;
+    private String topic;
     private Dictionary<String, Object> properties;
     private Connection connection;
     private PreparedStatement preparedStatement;
@@ -66,15 +66,15 @@ public class JdbcCollector implements Runnable {
     @Activate
     public void activate(ComponentContext context) throws Exception {
         properties = context.getProperties();
-        open(properties);
+        activate(properties);
     }
 
-    public void open(Dictionary<String, Object> config)  throws Exception {
+    public void activate(Dictionary<String, Object> config)  throws Exception {
         query = getProperty(config, "query", null);
         if (query == null) {
             throw new IllegalStateException("Query is mandatory");
         }
-        dispatcherTopic = getProperty(config, EventConstants.EVENT_TOPIC, 
"decanter/collect/jdbc");
+        topic = getProperty(config, EventConstants.EVENT_TOPIC, 
"decanter/collect/jdbc");
 
         connection = dataSource.getConnection();
         preparedStatement = connection.prepareStatement(query);
@@ -101,7 +101,7 @@ public class JdbcCollector implements Runnable {
         List<Map<String, Object>> dataRows = query();
 
         for (Map<String, Object> data : dataRows) {
-            Event event = new Event(dispatcherTopic, data);
+            Event event = new Event(topic, data);
             dispatcher.postEvent(event);
         }
     }
diff --git 
a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
 
b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
index 1d2ecb6..8a464ab 100644
--- 
a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
+++ 
b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
@@ -62,7 +62,7 @@ public class TestJdbcCollector {
         collector.dataSource = dataSource;
         Dictionary<String, Object> config = new Hashtable<>();
         config.put("query", "select * from TEST");
-        collector.open(config);
+        collector.activate(config);
 
         List<Map<String, Object>> dataRows = collector.query();
         Assert.assertEquals(2, dataRows.size());
diff --git 
a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
 
b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
index 4573bbf..8e98d7e 100644
--- 
a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
+++ 
b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
@@ -26,6 +26,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -155,7 +156,8 @@ public class DecanterCollectorJettyHandler implements 
Handler {
         } catch (Exception e) {
             // nothing to do
         }
-        Event event = new Event("decanter/collect/jetty", data);
+        String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jetty";
+        Event event = new Event(topic, data);
         dispatcher.postEvent(event);
     }
 
diff --git 
a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
 
b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
index 495cdad..35a3b0a 100644
--- 
a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
+++ 
b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
@@ -38,6 +38,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,13 +150,15 @@ public class JmxCollector implements Runnable {
                 names.addAll(connection.queryNames(getObjectName(null), null));
             }
 
+            String topic = (properties.get(EventConstants.EVENT_TOPIC) != 
null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : 
"decanter/collect/jmx/";
+
             for (ObjectName name : names) {
                 LOGGER.debug("Harvesting {}", name);
                 try {
                     Map<String, Object> data = harvester.harvestBean(name);
                     PropertiesPreparator.prepare(data, properties);
                     data.put("host", host);
-                    Event event = new Event("decanter/collect/jmx/" + 
this.type + "/" + getTopic(name), data);
+                    Event event = new Event(topic + this.type + "/" + 
getTopic(name), data);
                     LOGGER.debug("Posting for {}", name);
                     dispatcher.postEvent(event);
                 } catch (Exception e) {
@@ -174,7 +177,7 @@ public class JmxCollector implements Runnable {
                     Map<String, Object> data = 
harvester.executeOperation(operation, objectName, operationName, arguments, 
signatures);
                     PropertiesPreparator.prepare(data, properties);
                     data.put("host", host);
-                    Event event = new Event("decanter/collect/jmx/" + 
this.type + "/" + getTopic(objectName), data);
+                    Event event = new Event(topic + this.type + "/" + 
getTopic(objectName), data);
                     dispatcher.postEvent(event);
                 } else {
                     LOGGER.warn("{} is not well configured ({})", operation, 
raw);
diff --git 
a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
 
b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
index 98d64de..5ca4ff2 100644
--- 
a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
+++ 
b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
@@ -33,6 +33,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,8 +117,8 @@ public class LogCollector implements PaxAppender {
         if (loggerName == null || loggerName.isEmpty()) {
             loggerName = "default";
         }
-        String topic = "decanter/collect/log/" + cleanLoggerName(loggerName);
-        this.dispatcher.postEvent(new Event(topic, data));
+        String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/";
+        this.dispatcher.postEvent(new Event(topic + 
cleanLoggerName(loggerName), data));
     }
     
     /*
diff --git 
a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
 
b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
index 11a0486..ce79648 100644
--- 
a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
+++ 
b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
@@ -45,6 +45,7 @@ import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,12 +138,12 @@ public class SocketCollector implements Closeable, 
Runnable {
             LOGGER.warn("Can't prepare data for the dispatcher", e);
         }
 
-        String topic = loggerName2Topic(loggingEvent.getLoggerName());
-        Event event = new Event(topic, data);
+        String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/";
+        Event event = new Event(loggerName2Topic(topic, 
loggingEvent.getLoggerName()), data);
         dispatcher.postEvent(event);
     }
 
-    static String loggerName2Topic(String loggerName) {
+    static String loggerName2Topic(String topic, String loggerName) {
         StringBuilder out = new StringBuilder();
         for (int c = 0; c < loggerName.length(); c++) {
             Character ch = loggerName.charAt(c);
@@ -156,7 +157,7 @@ public class SocketCollector implements Closeable, Runnable 
{
         while (outSt.length() > 1 && outSt.endsWith("/")) {
             outSt = outSt.substring(0, outSt.length() - 1);
         }
-        return "decanter/collect/log/" + outSt.replace(".", "/");
+        return topic + outSt.replace(".", "/");
     }
 
     private void putLocation(Map<String, Object> data, LocationInfo loc) {
diff --git 
a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
 
b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
index 89e1688..2e462c9 100644
--- 
a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
+++ 
b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
@@ -73,7 +73,7 @@ public class SocketCollectorTest {
 
     @Test
     public void testLoggerName2Topic() {
-        String topic = 
SocketCollector.loggerName2Topic("test.[Tomcat].[localhost].[/]");
+        String topic = 
SocketCollector.loggerName2Topic("decanter/collect/log/", 
"test.[Tomcat].[localhost].[/]");
         Assert.assertEquals("decanter/collect/log/test/Tomcat/localhost", 
topic);
     }
 
diff --git 
a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
 
b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
index e02a027..1148ba9 100644
--- 
a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
+++ 
b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
@@ -24,6 +24,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,11 +82,7 @@ public class OpenstackCollector implements Runnable {
 
     public void activate(Dictionary<String, Object> config) throws Exception {
         this.config = config;
-        if (config.get("topic") != null) {
-            topic = (String) config.get("topic");
-        } else {
-            topic = "decanter/collect/openstack";
-        }
+        topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) 
config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/openstack";
         if (config.get("openstack.identity") == null) {
             throw new IllegalStateException("openstack.identity is not 
configured");
         }
diff --git 
a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
 
b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
index e725fe3..b555958 100644
--- 
a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
+++ 
b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
@@ -23,6 +23,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import oshi.SystemInfo;
@@ -368,7 +369,9 @@ public class OshiCollector implements Runnable {
 
             PropertiesPreparator.prepare(data, properties);
 
-            dispatcher.postEvent(new Event("decanter/collect/oshi", data));
+            String topic = (properties.get(EventConstants.EVENT_TOPIC) != 
null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : 
"decanter/collect/oshi";
+
+            dispatcher.postEvent(new Event(topic, data));
         } catch (Exception e) {
             LOGGER.warn("Can't get oshi system metrics", e);
         }
diff --git 
a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
 
b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
index e31b594..386c455 100644
--- 
a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
+++ 
b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
@@ -23,6 +23,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +91,8 @@ public class PrometheusCollector implements Runnable {
                 }
             }
             PropertiesPreparator.prepare(data, properties);
-            dispatcher.postEvent(new Event("decanter/collect/prometheus", 
data));
+            String topic = (properties.get(EventConstants.EVENT_TOPIC) != 
null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : 
"decanter/collect/prometheus";
+            dispatcher.postEvent(new Event(topic, data));
         } catch (Exception e) {
             LOGGER.warn("Can't get Prometheus metrics", e);
             e.printStackTrace();
diff --git 
a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
 
b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
index 8dff199..d604be3 100644
--- 
a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
+++ 
b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
@@ -24,6 +24,7 @@ import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.redisson.Redisson;
 import org.redisson.api.RMap;
 import org.redisson.api.RedissonClient;
@@ -102,7 +103,8 @@ public class RedisCollector implements Runnable {
         } catch (Exception e) {
             LOGGER.warn("Can't prepare data", e);
         }
-        dispatcher.postEvent(new Event("decanter/collect/redis", data));
+        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? 
(String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/redis";
+        dispatcher.postEvent(new Event(topic, data));
     }
 
 }
diff --git 
a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
 
b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
index d27d3c2..48908fc 100644
--- 
a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
+++ 
b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
@@ -39,6 +39,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +62,7 @@ public class RestServletCollector extends HttpServlet {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(RestServletCollector.class);
 
-    private String baseTopic;
+    private String topic;
     private Dictionary<String, Object> properties;
     private long maxRequestSize = 100000;
 
@@ -69,7 +70,7 @@ public class RestServletCollector extends HttpServlet {
     @Activate
     public void activate(ComponentContext context) throws 
MalformedURLException {
         Dictionary<String, Object> props = context.getProperties();
-        this.baseTopic = getProperty(props, "topic", 
"decanter/collect/rest-servlet");
+        this.topic = getProperty(props, EventConstants.EVENT_TOPIC, 
"decanter/collect/rest-servlet");
         this.properties = props;
         if (this.properties.get("max.request.size") != null) {
             maxRequestSize = 
Long.parseLong((String)this.properties.get("max.request.size"));
@@ -101,7 +102,7 @@ public class RestServletCollector extends HttpServlet {
 
             PropertiesPreparator.prepare(data, properties);
 
-            Event event = new Event(baseTopic, data);
+            Event event = new Event(topic, data);
             dispatcher.postEvent(event);
             resp.setStatus(HttpServletResponse.SC_CREATED);
             LOGGER.debug("Karaf Decanter REST Servlet Collector harvesting 
done");
diff --git 
a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
 
b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
index 57e674d..4d534bf 100644
--- 
a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
+++ 
b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
@@ -32,6 +32,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +78,7 @@ public class RestCollector implements Runnable {
         this.config = config;
         this.url = new URL(getProperty(config, "url", 
"http://localhost:8181";));
         this.paths = getProperty(config, "paths", "").split(",");
-        this.topic = getProperty(config, "topic", "decanter/collect/rest");
+        this.topic = getProperty(config, EventConstants.EVENT_TOPIC, 
"decanter/collect/rest");
         this.requestMethod = getProperty(config, "request.method", "GET");
         this.user = getProperty(config, "user", null);
         this.password = getProperty(config, "password", null);
diff --git 
a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
 
b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
index 6779680..04ad87d 100644
--- 
a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
+++ 
b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
@@ -21,6 +21,7 @@ import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.*;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.snmp4j.*;
@@ -238,7 +239,8 @@ public class SnmpPoller implements ResponseListener, 
Runnable {
             data.put(variableBinding.getOid().toString(), 
variableBinding.getVariable().toString());
         }
         // send event
-        dispatcher.postEvent(new Event("decanter/collector/snmp", data));
+        String topic = (configuration.get(EventConstants.EVENT_TOPIC) != null) 
? (String) configuration.get(EventConstants.EVENT_TOPIC) : 
"decanter/collector/snmp";
+        dispatcher.postEvent(new Event(topic, data));
     }
 
     private OctetString convertToOctetString(String value) {
diff --git 
a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
 
b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
index fb6d6e1..a7ebeff 100644
--- 
a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
+++ 
b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
@@ -34,6 +34,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +71,7 @@ public class SoapCollector implements Runnable {
         if (config.get("url") == null) {
             throw new IllegalArgumentException("url property is mandatory");
         }
-        this.topic = "decanter/collect/soap";
-        if (config.get("topic") != null) {
-            this.topic = (String) config.get("topic");
-        }
+        topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) 
config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/soap";
         url = new URL((String) config.get("url"));
         if (config.get("soap.request") == null) {
             throw new IllegalStateException("soap.request property is 
mandatory");
diff --git 
a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
 
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 996dfa5..cb1f210 100644
--- 
a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ 
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -57,7 +57,7 @@ public class SocketCollector implements Closeable, Runnable {
     private boolean open;
     private ExecutorService executor;
     private Dictionary<String, Object> properties;
-    private String eventAdminTopic;
+    private String topic;
     private long maxRequestSize = 100000;
     
     @Reference
@@ -85,7 +85,7 @@ public class SocketCollector implements Closeable, Runnable {
             this.protocol = Protocol.TCP;
         }
 
-        eventAdminTopic = getProperty(this.properties, 
EventConstants.EVENT_TOPIC, "decanter/collect/socket");
+        topic = getProperty(this.properties, EventConstants.EVENT_TOPIC, 
"decanter/collect/socket");
 
         switch (protocol) {
             case TCP:
@@ -181,7 +181,7 @@ public class SocketCollector implements Closeable, Runnable 
{
                         data.put("type", "socket");
                         data.putAll(unmarshaller.unmarshal(new 
ByteArrayInputStream(line.getBytes())));
                         PropertiesPreparator.prepare(data, properties);
-                        Event event = new Event(eventAdminTopic, data);
+                        Event event = new Event(topic, data);
                         dispatcher.postEvent(event);
                     }
                 }
@@ -217,7 +217,7 @@ public class SocketCollector implements Closeable, Runnable 
{
                     LOGGER.warn("Can't prepare data for the dispatcher", e);
                 }
 
-                Event event = new Event(eventAdminTopic, data);
+                Event event = new Event(topic, data);
                 dispatcher.postEvent(event);
                 datagramSocket.send(packet);
             } catch (EOFException e) {
diff --git 
a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
 
b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
index 599cf49..d21ba40 100644
--- 
a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
+++ 
b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
@@ -39,6 +39,7 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,7 @@ public class SystemCollector implements Runnable {
     @Activate
     public void activate(ComponentContext context) {
         this.properties = context.getProperties();
-        this.topic = context.getProperties().get("topic") != null ? 
String.class.cast(context.getProperties().get("topic")) : 
"decanter/collect/system/";
+        this.topic = context.getProperties().get(EventConstants.EVENT_TOPIC) 
!= null ? 
String.class.cast(context.getProperties().get(EventConstants.EVENT_TOPIC)) : 
"decanter/collect/system/";
         if (!this.topic.endsWith("/")) {
             this.topic = this.topic + "/";
         }
diff --git 
a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
 
b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
index 713ef91..38255a9 100644
--- 
a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
+++ 
b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
@@ -22,7 +22,6 @@ import org.apache.camel.core.osgi.OsgiClassResolver;
 import org.apache.camel.core.osgi.OsgiDataFormatResolver;
 import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
 import org.apache.camel.core.osgi.OsgiLanguageResolver;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.karaf.decanter.collector.camel.DecanterEventNotifier;
 import org.apache.karaf.decanter.collector.camel.DecanterInterceptStrategy;
 import org.apache.karaf.itests.KarafTestSupport;
@@ -136,7 +135,7 @@ public class CamelCollectorTest extends KarafTestSupport {
         // create route with notifier
         EventAdmin eventAdmin = getOsgiService(EventAdmin.class);
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
 
         RouteBuilder builder = new RouteBuilder() {
             @Override
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc 
b/manual/src/main/asciidoc/user-guide/collectors.adoc
index 4f45f64..0a986de 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -1242,6 +1242,18 @@ unmarshaller.target=(dataFormat=json)
 
 You have to define the locations of the OpenStack APIs and if you enabled 
requesting the APIs or not.
 
+==== Target dispatcher topics
+
+All collectors use a default Decanter dispatcher topic name. However, you can 
change the topic name with the one of your choice.
+
+For that, you have to set `event.topics` property in the collector 
configuration.
+
+For instance:
+
+----
+event.topics=my/topic/name
+----
+
 ==== Customizing properties in collectors
 
 You can add, rename or remove properties collected by the collectors before 
sending it to the dispatcher.
diff --git 
a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
 
b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
index 78c8284..bb68296 100644
--- 
a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
+++ 
b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
@@ -43,10 +43,7 @@ public class PassThroughProcessor implements EventHandler {
     @Activate
     public void activate(ComponentContext componentContext) {
         Dictionary<String, Object> properties = 
componentContext.getProperties();
-        String targetTopic = "decanter/process/passthrough";
-        if (properties.get("target.topics") != null) {
-            targetTopic = properties.get("target.topics").toString();
-        }
+        targetTopic = (properties.get("target.topics") != null) ? (String) 
properties.get("target.topics") : "decanter/process/passthrough";
         activate(targetTopic);
     }
 

Reply via email to