This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/main by this push: new 05ad6c1 [KARAF-7170] Allow to define event.topics property in all collectors new 6d38c21 Merge pull request #267 from jbonofre/KARAF-7170 05ad6c1 is described below commit 05ad6c12a4137cc3cc40d35bfecfb78447cd1345 Author: Jean-Baptiste Onofré <jbono...@apache.org> AuthorDate: Wed Sep 8 16:50:57 2021 +0200 [KARAF-7170] Allow to define event.topics property in all collectors --- .../collector/camel/DecanterEventNotifier.java | 21 ++++++++++++----- .../collector/camel/DecanterInterceptStrategy.java | 11 ++++++++- .../collector/camel/DecanterEventNotifierTest.java | 11 ++++----- .../configadmin/ConfigAdminCollector.java | 11 +++++---- collector/dropwizard/pom.xml | 4 ++++ .../dropwizard/DecanterReporterCollector.java | 27 +++++++++++++++++++++- .../decanter/collector/druid/DruidCollector.java | 3 ++- .../elasticsearch/ElasticsearchCollector.java | 21 +++++++++-------- .../collector/eventadmin/EventCollector.java | 7 +++--- .../collector/file/DecanterTailerListener.java | 6 ++++- .../decanter/collector/jdbc/JdbcCollector.java | 10 ++++---- .../decanter/collector/jdbc/TestJdbcCollector.java | 2 +- .../jetty/DecanterCollectorJettyHandler.java | 4 +++- .../karaf/decanter/collector/jmx/JmxCollector.java | 7 ++++-- .../karaf/decanter/collector/log/LogCollector.java | 5 ++-- .../collector/log/socket/SocketCollector.java | 9 ++++---- .../collector/log/socket/SocketCollectorTest.java | 2 +- .../collector/openstack/OpenstackCollector.java | 7 ++---- .../decanter/collector/oshi/OshiCollector.java | 5 +++- .../collector/prometheus/PrometheusCollector.java | 4 +++- .../decanter/collector/redis/RedisCollector.java | 4 +++- .../rest/servlet/RestServletCollector.java | 7 +++--- .../decanter/collector/rest/RestCollector.java | 3 ++- .../karaf/decanter/collector/snmp/SnmpPoller.java | 4 +++- .../decanter/collector/soap/SoapCollector.java | 6 ++--- .../decanter/collector/socket/SocketCollector.java | 8 +++---- .../decanter/collector/system/SystemCollector.java | 3 ++- .../itests/collector/CamelCollectorTest.java | 3 +-- .../src/main/asciidoc/user-guide/collectors.adoc | 12 ++++++++++ .../passthrough/PassThroughProcessor.java | 5 +--- 30 files changed, 156 insertions(+), 76 deletions(-) diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java index 4aa8eea..7f0b7a7 100644 --- a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java +++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java @@ -32,18 +32,27 @@ public class DecanterEventNotifier extends EventNotifierSupport { private static final Logger LOG = LoggerFactory.getLogger(DecanterEventNotifier.class.getName()); - private EventAdmin eventAdmin; + private EventAdmin dispatcher; + private String topic = "decanter/collect/camel/event"; private String camelContextMatcher = ".*"; private String routeMatcher = ".*"; private DefaultExchangeExtender dextender = new DefaultExchangeExtender(); private DecanterCamelEventExtender extender; - public EventAdmin getEventAdmin() { - return eventAdmin; + public EventAdmin getDispatcher() { + return dispatcher; } - public void setEventAdmin(EventAdmin eventAdmin) { - this.eventAdmin = eventAdmin; + public void setDispatcher(EventAdmin dispatcher) { + this.dispatcher = dispatcher; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; } public void setCamelContextMatcher(String camelContextMatcher) { @@ -151,7 +160,7 @@ public class DecanterEventNotifier extends EventNotifierSupport { extender.extend(eventMap, (Exchange) source); } } - eventAdmin.postEvent(new Event("decanter/collect/camel/event", eventMap)); + dispatcher.postEvent(new Event(topic, eventMap)); } catch (Exception ex) { LOG.warn("Failed to handle event", ex); } diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java index 4b7cad8..5ff6bae 100644 --- a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java +++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java @@ -31,6 +31,7 @@ import org.osgi.service.event.EventAdmin; public class DecanterInterceptStrategy implements InterceptStrategy { private EventAdmin dispatcher; + private String topic = "decanter/collect/camel/tracer"; private DefaultExchangeExtender dextender = new DefaultExchangeExtender(); private DecanterCamelEventExtender extender; @@ -72,7 +73,7 @@ public class DecanterInterceptStrategy implements InterceptStrategy { data.put(header.substring("decanter.".length()), exchange.getIn().getHeader(header)); } } - Event event = new Event("decanter/collect/camel/tracer", data); + Event event = new Event(topic, data); dispatcher.postEvent(event); } @@ -87,6 +88,14 @@ public class DecanterInterceptStrategy implements InterceptStrategy { public void setDispatcher(EventAdmin dispatcher) { this.dispatcher = dispatcher; } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } public void setIncludeBody(boolean includeBody) { dextender.setIncludeBody(includeBody); diff --git a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java index 83cb37f..036f9d3 100644 --- a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java +++ b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java @@ -20,7 +20,6 @@ import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.spi.CamelEvent; import org.junit.Assert; import org.junit.Test; import org.osgi.service.event.Event; @@ -33,7 +32,7 @@ public class DecanterEventNotifierTest { public void testEventNotifier() throws Exception { MockEventAdmin eventAdmin = new MockEventAdmin(); DecanterEventNotifier notifier = new DecanterEventNotifier(); - notifier.setEventAdmin(eventAdmin); + notifier.setDispatcher(eventAdmin); DefaultCamelContext camelContext = createCamelContext(notifier); @@ -73,7 +72,7 @@ public class DecanterEventNotifierTest { public void testCamelContextFilter() throws Exception { MockEventAdmin eventAdmin = new MockEventAdmin(); DecanterEventNotifier notifier = new DecanterEventNotifier(); - notifier.setEventAdmin(eventAdmin); + notifier.setDispatcher(eventAdmin); notifier.setCamelContextMatcher("foo"); DefaultCamelContext camelContext = createCamelContext(notifier); @@ -88,7 +87,7 @@ public class DecanterEventNotifierTest { public void testRouteIdFilter() throws Exception { MockEventAdmin eventAdmin = new MockEventAdmin(); DecanterEventNotifier notifier = new DecanterEventNotifier(); - notifier.setEventAdmin(eventAdmin); + notifier.setDispatcher(eventAdmin); notifier.setCamelContextMatcher(".*"); notifier.setRouteMatcher("foo"); @@ -104,7 +103,7 @@ public class DecanterEventNotifierTest { public void testIgnoredEvents() throws Exception { MockEventAdmin eventAdmin = new MockEventAdmin(); DecanterEventNotifier notifier = new DecanterEventNotifier(); - notifier.setEventAdmin(eventAdmin); + notifier.setDispatcher(eventAdmin); notifier.setIgnoreCamelContextEvents(true); DefaultCamelContext camelContext = createCamelContext(notifier); @@ -121,7 +120,7 @@ public class DecanterEventNotifierTest { DecanterEventNotifier notifier = new DecanterEventNotifier(); notifier.setIgnoreCamelContextEvents(true); notifier.setIgnoreRouteEvents(true); - notifier.setEventAdmin(eventAdmin); + notifier.setDispatcher(eventAdmin); notifier.setExtender(new TestExtender()); DefaultCamelContext camelContext = createCamelContext(notifier); diff --git a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java index 9757e0d..988bc25 100644 --- a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java +++ b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java @@ -27,6 +27,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ public class ConfigAdminCollector implements ConfigurationListener { @Reference private ConfigurationAdmin configurationAdmin; - private Dictionary<String, Object> properties; + private Dictionary<String, Object> config; @Activate public void activate(ComponentContext componentContext) { @@ -57,7 +58,7 @@ public class ConfigAdminCollector implements ConfigurationListener { } public void activate(Dictionary<String, Object> properties) { - this.properties = properties; + this.config = properties; } @Override @@ -94,12 +95,14 @@ public class ConfigAdminCollector implements ConfigurationListener { } try { - PropertiesPreparator.prepare(data, properties); + PropertiesPreparator.prepare(data, config); } catch (Exception e) { // nothing to do } - dispatcher.postEvent(new Event("decanter/collect/configadmin", data)); + String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/configadmin"; + + dispatcher.postEvent(new Event(topic, data)); } } diff --git a/collector/dropwizard/pom.xml b/collector/dropwizard/pom.xml index d922421..56d7ff7 100644 --- a/collector/dropwizard/pom.xml +++ b/collector/dropwizard/pom.xml @@ -35,6 +35,10 @@ <dependencies> <dependency> + <groupId>org.apache.karaf.decanter.collector</groupId> + <artifactId>org.apache.karaf.decanter.collector.utils</artifactId> + </dependency> + <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> <version>4.2.3</version> diff --git a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java index c3b175b..571a9c0 100644 --- a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java +++ b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java @@ -19,13 +19,18 @@ package org.apache.karaf.decanter.collector.dropwizard; import com.codahale.metrics.*; +import org.apache.karaf.decanter.collector.utils.PropertiesPreparator; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.component.annotations.ReferenceCardinality; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import java.net.InetAddress; +import java.util.Dictionary; import java.util.HashMap; import java.util.Map; @@ -45,6 +50,17 @@ public class DecanterReporterCollector implements Runnable { @Reference(cardinality = ReferenceCardinality.OPTIONAL) public MetricSet metricRegistry; + public Dictionary<String, Object> config; + + @Activate + public void activate(ComponentContext componentContext) { + activate(componentContext.getProperties()); + } + + public void activate(Dictionary<String, Object> config) { + this.config = config; + } + @Override public void run() { Map<String, Metric> metrics = metricRegistry.getMetrics(); @@ -95,7 +111,16 @@ public class DecanterReporterCollector implements Runnable { data.put("Mean Rate", timer.getMeanRate()); populateSnapshot(timer.getSnapshot(), data); } - Event event = new Event("decanter/collect/dropwizard", data); + + try { + PropertiesPreparator.prepare(data, config); + } catch (Exception e) { + // nothing to do + } + + String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/dropwizard"; + + Event event = new Event(topic, data); dispatcher.postEvent(event); } } diff --git a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java index 5122bfb..c3a0f08 100644 --- a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java +++ b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java @@ -24,6 +24,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,7 @@ public class DruidCollector implements Runnable { data.put("query", key.substring("query.".length())); data.putAll(executeQuery(druidBroker, (String) config.get(key))); PropertiesPreparator.prepare(data, config); - String topic = (config.get("topic") != null) ? (String) config.get("topic") : "decanter/collect/druid"; + String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/druid"; dispatcher.postEvent(new Event(topic, data)); } catch (Exception e) { LOGGER.warn("Can't execute query {}", key.substring("query.".length()), e); diff --git a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java index 03cad36..8ef9176 100644 --- a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java +++ b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java @@ -43,6 +43,7 @@ import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,7 @@ public class ElasticsearchCollector implements Runnable { @Reference private EventAdmin dispatcher; - private Dictionary<String, Object> configuration; + private Dictionary<String, Object> config; private RestHighLevelClient restClient; @Activate @@ -74,7 +75,7 @@ public class ElasticsearchCollector implements Runnable { } public void activate(Dictionary<String, Object> configuration) { - this.configuration = configuration; + this.config = configuration; String addressesString = (configuration.get("addresses") != null) ? configuration.get("addresses").toString() : "http://localhost:9200"; String username = (configuration.get("username") != null) ? configuration.get("username").toString() : null; String password = (configuration.get("password") != null) ? configuration.get("password").toString() : null; @@ -126,12 +127,12 @@ public class ElasticsearchCollector implements Runnable { public void run() { SearchRequest searchRequest = new SearchRequest(); - String index = (configuration.get("index") != null) ? configuration.get("index").toString() : "decanter"; + String index = (config.get("index") != null) ? config.get("index").toString() : "decanter"; searchRequest.indices(index); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - String query = (configuration.get("query") != null) ? configuration.get("query").toString() : null; + String query = (config.get("query") != null) ? config.get("query").toString() : null; QueryBuilder queryBuilder; if (query == null) { queryBuilder = QueryBuilders.matchAllQuery(); @@ -139,17 +140,17 @@ public class ElasticsearchCollector implements Runnable { queryBuilder = QueryBuilders.queryStringQuery(query); } searchSourceBuilder.query(queryBuilder); - String fromString = (configuration.get("from") != null) ? configuration.get("from").toString() : null; + String fromString = (config.get("from") != null) ? config.get("from").toString() : null; if (fromString != null) { int from = Integer.parseInt(fromString); searchSourceBuilder.from(from); } - String sizeString = (configuration.get("size") != null) ? configuration.get("size").toString() : null; + String sizeString = (config.get("size") != null) ? config.get("size").toString() : null; if (sizeString != null) { int size = Integer.parseInt(sizeString); searchSourceBuilder.size(size); } - String timeoutString = (configuration.get("timeout") != null) ? configuration.get("timeout").toString() : null; + String timeoutString = (config.get("timeout") != null) ? config.get("timeout").toString() : null; if (timeoutString != null) { int timeout = Integer.parseInt(timeoutString); searchSourceBuilder.timeout(new TimeValue(timeout, TimeUnit.SECONDS)); @@ -176,12 +177,14 @@ public class ElasticsearchCollector implements Runnable { LOGGER.error("Can't query elasticsearch", e); } try { - PropertiesPreparator.prepare(data, configuration); + PropertiesPreparator.prepare(data, config); } catch (Exception e) { LOGGER.warn("Can't prepare event", e); } - dispatcher.postEvent(new Event("decanter/collect/elasticsearch", data)); + String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/elasticsearch"; + + dispatcher.postEvent(new Event(topic, data)); } /** diff --git a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java index 44d79b0..b29aa33 100644 --- a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java +++ b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java @@ -23,6 +23,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.osgi.service.event.EventHandler; import javax.security.auth.Subject; @@ -38,11 +39,11 @@ public class EventCollector implements EventHandler { @Reference public EventAdmin dispatcher; - private Dictionary<String, Object> properties; + private Dictionary<String, Object> config; @Activate public void activate(ComponentContext context) { - properties = context.getProperties(); + config = context.getProperties(); } @Override @@ -68,7 +69,7 @@ public class EventCollector implements EventHandler { } try { - PropertiesPreparator.prepare(data, properties); + PropertiesPreparator.prepare(data, config); } catch (Exception e) { // nothing to do } diff --git a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java index 8f9460d..8cbc143 100644 --- a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java +++ b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java @@ -36,6 +36,7 @@ import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,8 @@ public class DecanterTailerListener extends TailerListenerAdapter { private String path; private String regex; private Pattern compiledRegex; + + private String topic; /** * additional properties provided by the user @@ -88,6 +91,7 @@ public class DecanterTailerListener extends TailerListenerAdapter { if (regex != null) { compiledRegex = Pattern.compile(regex); } + topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/file/"; } @Deactivate @@ -120,7 +124,7 @@ public class DecanterTailerListener extends TailerListenerAdapter { LOGGER.warn("Can't fully prepare data for the dispatcher", e); } - Event event = new Event("decanter/collect/file/" + type, data); + Event event = new Event(topic + type, data); dispatcher.postEvent(event); } diff --git a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java index 04dbd0c..ef9d4e7 100644 --- a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java +++ b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java @@ -58,7 +58,7 @@ public class JdbcCollector implements Runnable { private final static Logger LOGGER = LoggerFactory.getLogger(JdbcCollector.class); private String query; - private String dispatcherTopic; + private String topic; private Dictionary<String, Object> properties; private Connection connection; private PreparedStatement preparedStatement; @@ -66,15 +66,15 @@ public class JdbcCollector implements Runnable { @Activate public void activate(ComponentContext context) throws Exception { properties = context.getProperties(); - open(properties); + activate(properties); } - public void open(Dictionary<String, Object> config) throws Exception { + public void activate(Dictionary<String, Object> config) throws Exception { query = getProperty(config, "query", null); if (query == null) { throw new IllegalStateException("Query is mandatory"); } - dispatcherTopic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc"); + topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc"); connection = dataSource.getConnection(); preparedStatement = connection.prepareStatement(query); @@ -101,7 +101,7 @@ public class JdbcCollector implements Runnable { List<Map<String, Object>> dataRows = query(); for (Map<String, Object> data : dataRows) { - Event event = new Event(dispatcherTopic, data); + Event event = new Event(topic, data); dispatcher.postEvent(event); } } diff --git a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java index 1d2ecb6..8a464ab 100644 --- a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java +++ b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java @@ -62,7 +62,7 @@ public class TestJdbcCollector { collector.dataSource = dataSource; Dictionary<String, Object> config = new Hashtable<>(); config.put("query", "select * from TEST"); - collector.open(config); + collector.activate(config); List<Map<String, Object>> dataRows = collector.query(); Assert.assertEquals(2, dataRows.size()); diff --git a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java index 4573bbf..8e98d7e 100644 --- a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java +++ b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java @@ -26,6 +26,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -155,7 +156,8 @@ public class DecanterCollectorJettyHandler implements Handler { } catch (Exception e) { // nothing to do } - Event event = new Event("decanter/collect/jetty", data); + String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jetty"; + Event event = new Event(topic, data); dispatcher.postEvent(event); } diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java index 495cdad..35a3b0a 100644 --- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java +++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java @@ -38,6 +38,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,13 +150,15 @@ public class JmxCollector implements Runnable { names.addAll(connection.queryNames(getObjectName(null), null)); } + String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jmx/"; + for (ObjectName name : names) { LOGGER.debug("Harvesting {}", name); try { Map<String, Object> data = harvester.harvestBean(name); PropertiesPreparator.prepare(data, properties); data.put("host", host); - Event event = new Event("decanter/collect/jmx/" + this.type + "/" + getTopic(name), data); + Event event = new Event(topic + this.type + "/" + getTopic(name), data); LOGGER.debug("Posting for {}", name); dispatcher.postEvent(event); } catch (Exception e) { @@ -174,7 +177,7 @@ public class JmxCollector implements Runnable { Map<String, Object> data = harvester.executeOperation(operation, objectName, operationName, arguments, signatures); PropertiesPreparator.prepare(data, properties); data.put("host", host); - Event event = new Event("decanter/collect/jmx/" + this.type + "/" + getTopic(objectName), data); + Event event = new Event(topic + this.type + "/" + getTopic(objectName), data); dispatcher.postEvent(event); } else { LOGGER.warn("{} is not well configured ({})", operation, raw); diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java index 98d64de..5ca4ff2 100644 --- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java +++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java @@ -33,6 +33,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,8 +117,8 @@ public class LogCollector implements PaxAppender { if (loggerName == null || loggerName.isEmpty()) { loggerName = "default"; } - String topic = "decanter/collect/log/" + cleanLoggerName(loggerName); - this.dispatcher.postEvent(new Event(topic, data)); + String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/"; + this.dispatcher.postEvent(new Event(topic + cleanLoggerName(loggerName), data)); } /* diff --git a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java index 11a0486..ce79648 100644 --- a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java +++ b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java @@ -45,6 +45,7 @@ import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,12 +138,12 @@ public class SocketCollector implements Closeable, Runnable { LOGGER.warn("Can't prepare data for the dispatcher", e); } - String topic = loggerName2Topic(loggingEvent.getLoggerName()); - Event event = new Event(topic, data); + String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/"; + Event event = new Event(loggerName2Topic(topic, loggingEvent.getLoggerName()), data); dispatcher.postEvent(event); } - static String loggerName2Topic(String loggerName) { + static String loggerName2Topic(String topic, String loggerName) { StringBuilder out = new StringBuilder(); for (int c = 0; c < loggerName.length(); c++) { Character ch = loggerName.charAt(c); @@ -156,7 +157,7 @@ public class SocketCollector implements Closeable, Runnable { while (outSt.length() > 1 && outSt.endsWith("/")) { outSt = outSt.substring(0, outSt.length() - 1); } - return "decanter/collect/log/" + outSt.replace(".", "/"); + return topic + outSt.replace(".", "/"); } private void putLocation(Map<String, Object> data, LocationInfo loc) { diff --git a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java index 89e1688..2e462c9 100644 --- a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java +++ b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java @@ -73,7 +73,7 @@ public class SocketCollectorTest { @Test public void testLoggerName2Topic() { - String topic = SocketCollector.loggerName2Topic("test.[Tomcat].[localhost].[/]"); + String topic = SocketCollector.loggerName2Topic("decanter/collect/log/", "test.[Tomcat].[localhost].[/]"); Assert.assertEquals("decanter/collect/log/test/Tomcat/localhost", topic); } diff --git a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java index e02a027..1148ba9 100644 --- a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java +++ b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java @@ -24,6 +24,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,11 +82,7 @@ public class OpenstackCollector implements Runnable { public void activate(Dictionary<String, Object> config) throws Exception { this.config = config; - if (config.get("topic") != null) { - topic = (String) config.get("topic"); - } else { - topic = "decanter/collect/openstack"; - } + topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/openstack"; if (config.get("openstack.identity") == null) { throw new IllegalStateException("openstack.identity is not configured"); } diff --git a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java index e725fe3..b555958 100644 --- a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java +++ b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java @@ -23,6 +23,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import oshi.SystemInfo; @@ -368,7 +369,9 @@ public class OshiCollector implements Runnable { PropertiesPreparator.prepare(data, properties); - dispatcher.postEvent(new Event("decanter/collect/oshi", data)); + String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/oshi"; + + dispatcher.postEvent(new Event(topic, data)); } catch (Exception e) { LOGGER.warn("Can't get oshi system metrics", e); } diff --git a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java index e31b594..386c455 100644 --- a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java +++ b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java @@ -23,6 +23,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,8 @@ public class PrometheusCollector implements Runnable { } } PropertiesPreparator.prepare(data, properties); - dispatcher.postEvent(new Event("decanter/collect/prometheus", data)); + String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/prometheus"; + dispatcher.postEvent(new Event(topic, data)); } catch (Exception e) { LOGGER.warn("Can't get Prometheus metrics", e); e.printStackTrace(); diff --git a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java index 8dff199..d604be3 100644 --- a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java +++ b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java @@ -24,6 +24,7 @@ import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.redisson.Redisson; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; @@ -102,7 +103,8 @@ public class RedisCollector implements Runnable { } catch (Exception e) { LOGGER.warn("Can't prepare data", e); } - dispatcher.postEvent(new Event("decanter/collect/redis", data)); + String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/redis"; + dispatcher.postEvent(new Event(topic, data)); } } diff --git a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java index d27d3c2..48908fc 100644 --- a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java +++ b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java @@ -39,6 +39,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,7 @@ public class RestServletCollector extends HttpServlet { private final static Logger LOGGER = LoggerFactory.getLogger(RestServletCollector.class); - private String baseTopic; + private String topic; private Dictionary<String, Object> properties; private long maxRequestSize = 100000; @@ -69,7 +70,7 @@ public class RestServletCollector extends HttpServlet { @Activate public void activate(ComponentContext context) throws MalformedURLException { Dictionary<String, Object> props = context.getProperties(); - this.baseTopic = getProperty(props, "topic", "decanter/collect/rest-servlet"); + this.topic = getProperty(props, EventConstants.EVENT_TOPIC, "decanter/collect/rest-servlet"); this.properties = props; if (this.properties.get("max.request.size") != null) { maxRequestSize = Long.parseLong((String)this.properties.get("max.request.size")); @@ -101,7 +102,7 @@ public class RestServletCollector extends HttpServlet { PropertiesPreparator.prepare(data, properties); - Event event = new Event(baseTopic, data); + Event event = new Event(topic, data); dispatcher.postEvent(event); resp.setStatus(HttpServletResponse.SC_CREATED); LOGGER.debug("Karaf Decanter REST Servlet Collector harvesting done"); diff --git a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java index 57e674d..4d534bf 100644 --- a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java +++ b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java @@ -32,6 +32,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,7 @@ public class RestCollector implements Runnable { this.config = config; this.url = new URL(getProperty(config, "url", "http://localhost:8181")); this.paths = getProperty(config, "paths", "").split(","); - this.topic = getProperty(config, "topic", "decanter/collect/rest"); + this.topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/rest"); this.requestMethod = getProperty(config, "request.method", "GET"); this.user = getProperty(config, "user", null); this.password = getProperty(config, "password", null); diff --git a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java index 6779680..04ad87d 100644 --- a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java +++ b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java @@ -21,6 +21,7 @@ import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.*; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.snmp4j.*; @@ -238,7 +239,8 @@ public class SnmpPoller implements ResponseListener, Runnable { data.put(variableBinding.getOid().toString(), variableBinding.getVariable().toString()); } // send event - dispatcher.postEvent(new Event("decanter/collector/snmp", data)); + String topic = (configuration.get(EventConstants.EVENT_TOPIC) != null) ? (String) configuration.get(EventConstants.EVENT_TOPIC) : "decanter/collector/snmp"; + dispatcher.postEvent(new Event(topic, data)); } private OctetString convertToOctetString(String value) { diff --git a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java index fb6d6e1..a7ebeff 100644 --- a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java +++ b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java @@ -34,6 +34,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,10 +71,7 @@ public class SoapCollector implements Runnable { if (config.get("url") == null) { throw new IllegalArgumentException("url property is mandatory"); } - this.topic = "decanter/collect/soap"; - if (config.get("topic") != null) { - this.topic = (String) config.get("topic"); - } + topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/soap"; url = new URL((String) config.get("url")); if (config.get("soap.request") == null) { throw new IllegalStateException("soap.request property is mandatory"); diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java index 996dfa5..cb1f210 100644 --- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java +++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java @@ -57,7 +57,7 @@ public class SocketCollector implements Closeable, Runnable { private boolean open; private ExecutorService executor; private Dictionary<String, Object> properties; - private String eventAdminTopic; + private String topic; private long maxRequestSize = 100000; @Reference @@ -85,7 +85,7 @@ public class SocketCollector implements Closeable, Runnable { this.protocol = Protocol.TCP; } - eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket"); + topic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket"); switch (protocol) { case TCP: @@ -181,7 +181,7 @@ public class SocketCollector implements Closeable, Runnable { data.put("type", "socket"); data.putAll(unmarshaller.unmarshal(new ByteArrayInputStream(line.getBytes()))); PropertiesPreparator.prepare(data, properties); - Event event = new Event(eventAdminTopic, data); + Event event = new Event(topic, data); dispatcher.postEvent(event); } } @@ -217,7 +217,7 @@ public class SocketCollector implements Closeable, Runnable { LOGGER.warn("Can't prepare data for the dispatcher", e); } - Event event = new Event(eventAdminTopic, data); + Event event = new Event(topic, data); dispatcher.postEvent(event); datagramSocket.send(packet); } catch (EOFException e) { diff --git a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java index 599cf49..d21ba40 100644 --- a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java +++ b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java @@ -39,6 +39,7 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,7 @@ public class SystemCollector implements Runnable { @Activate public void activate(ComponentContext context) { this.properties = context.getProperties(); - this.topic = context.getProperties().get("topic") != null ? String.class.cast(context.getProperties().get("topic")) : "decanter/collect/system/"; + this.topic = context.getProperties().get(EventConstants.EVENT_TOPIC) != null ? String.class.cast(context.getProperties().get(EventConstants.EVENT_TOPIC)) : "decanter/collect/system/"; if (!this.topic.endsWith("/")) { this.topic = this.topic + "/"; } diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java index 713ef91..38255a9 100644 --- a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java +++ b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java @@ -22,7 +22,6 @@ import org.apache.camel.core.osgi.OsgiClassResolver; import org.apache.camel.core.osgi.OsgiDataFormatResolver; import org.apache.camel.core.osgi.OsgiDefaultCamelContext; import org.apache.camel.core.osgi.OsgiLanguageResolver; -import org.apache.camel.impl.DefaultCamelContext; import org.apache.karaf.decanter.collector.camel.DecanterEventNotifier; import org.apache.karaf.decanter.collector.camel.DecanterInterceptStrategy; import org.apache.karaf.itests.KarafTestSupport; @@ -136,7 +135,7 @@ public class CamelCollectorTest extends KarafTestSupport { // create route with notifier EventAdmin eventAdmin = getOsgiService(EventAdmin.class); DecanterEventNotifier notifier = new DecanterEventNotifier(); - notifier.setEventAdmin(eventAdmin); + notifier.setDispatcher(eventAdmin); RouteBuilder builder = new RouteBuilder() { @Override diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc index 4f45f64..0a986de 100644 --- a/manual/src/main/asciidoc/user-guide/collectors.adoc +++ b/manual/src/main/asciidoc/user-guide/collectors.adoc @@ -1242,6 +1242,18 @@ unmarshaller.target=(dataFormat=json) You have to define the locations of the OpenStack APIs and if you enabled requesting the APIs or not. +==== Target dispatcher topics + +All collectors use a default Decanter dispatcher topic name. However, you can change the topic name with the one of your choice. + +For that, you have to set `event.topics` property in the collector configuration. + +For instance: + +---- +event.topics=my/topic/name +---- + ==== Customizing properties in collectors You can add, rename or remove properties collected by the collectors before sending it to the dispatcher. diff --git a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java index 78c8284..bb68296 100644 --- a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java +++ b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java @@ -43,10 +43,7 @@ public class PassThroughProcessor implements EventHandler { @Activate public void activate(ComponentContext componentContext) { Dictionary<String, Object> properties = componentContext.getProperties(); - String targetTopic = "decanter/process/passthrough"; - if (properties.get("target.topics") != null) { - targetTopic = properties.get("target.topics").toString(); - } + targetTopic = (properties.get("target.topics") != null) ? (String) properties.get("target.topics") : "decanter/process/passthrough"; activate(targetTopic); }