Repository: karaf-decanter Updated Branches: refs/heads/master f3f93755b -> b9438dc3e
Improved cassandra appender Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/72db51a4 Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/72db51a4 Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/72db51a4 Branch: refs/heads/master Commit: 72db51a498a9a9208c1ffb35eaa509f7ecef461a Parents: f3f9375 Author: Christian Schneider <[email protected]> Authored: Mon Aug 8 18:46:38 2016 +0200 Committer: Christian Schneider <[email protected]> Committed: Mon Aug 8 18:46:38 2016 +0200 ---------------------------------------------------------------------- appender/cassandra/pom.xml | 13 ++- .../appender/cassandra/CassandraAppender.java | 107 +++++++++---------- .../cassandra/CassandraAppenderTest.java | 16 ++- 3 files changed, 70 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/72db51a4/appender/cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/appender/cassandra/pom.xml b/appender/cassandra/pom.xml index 68ea240..861bae1 100644 --- a/appender/cassandra/pom.xml +++ b/appender/cassandra/pom.xml @@ -33,13 +33,10 @@ <dependencies> - <!-- Decanter API --> <dependency> <groupId>org.apache.karaf.decanter</groupId> <artifactId>org.apache.karaf.decanter.api</artifactId> </dependency> - - <!-- SLF4J --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -64,8 +61,14 @@ </exclusion> </exclusions> </dependency> - - <dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <version>1.7.21</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> <version>${cassandra.version}</version> http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/72db51a4/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java ---------------------------------------------------------------------- diff --git a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java index 6175f87..e3e7d17 100644 --- a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java +++ b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java @@ -23,6 +23,7 @@ import org.apache.karaf.decanter.api.marshaller.Marshaller; import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.Event; import org.osgi.service.event.EventConstants; @@ -46,40 +47,41 @@ public class CassandraAppender implements EventHandler { private final static Logger LOGGER = LoggerFactory.getLogger(CassandraAppender.class); - private String cassandraHost; - private Integer cassandraPort; - private String cassandraUser; - private String cassandraPassword; private String keyspace; private String tableName; private Marshaller marshaller; - private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS TABLENAME (timeStamp timestamp PRIMARY KEY, content Text);"; + private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS %s (timeStamp timestamp PRIMARY KEY, content Text);"; - private final static String upsertQueryTemplate = "INSERT INTO TABLENAME(timeStamp, content) VALUES(?,?);"; + private final static String insertTemplate = "INSERT INTO %s (timeStamp, content) VALUES(?,?);"; + + private Cluster cluster; public CassandraAppender() { } - public CassandraAppender(Marshaller marshaller, String keyspace, String tableName, String cassandraHost, - Integer cassandraPort, String cassandraUser, String cassandraPassword) { - this.marshaller = marshaller; - this.keyspace = keyspace; - this.tableName = tableName; - this.cassandraHost = cassandraHost; - this.cassandraPort = cassandraPort; - this.cassandraUser = cassandraUser; - this.cassandraPassword = cassandraPassword; - } - @SuppressWarnings("unchecked") @Activate public void activate(ComponentContext context) { Dictionary<String, Object> config = context.getProperties(); + activate(config); + } + + void activate(Dictionary<String, Object> config) { this.keyspace = getValue(config, "keyspace.name", "decanter"); this.tableName = getValue(config, "table.name", "decanter"); - this.cassandraHost = getValue(config, "cassandra.host", "localhost"); - this.cassandraPort = Integer.parseInt(getValue(config, "cassandra.port", "9042")); + String host = getValue(config, "cassandra.host", "localhost"); + Integer port = Integer.parseInt(getValue(config, "cassandra.port", "9042")); + Builder clusterBuilder = Cluster.builder().addContactPoint(host); + if (port != null) { + clusterBuilder.withPort(port); + } + cluster = clusterBuilder.build(); + } + + @Deactivate + public void deactivate() { + cluster.close(); } private String getValue(Dictionary<String, Object> config, String key, String defaultValue) { @@ -90,40 +92,16 @@ public class CassandraAppender implements EventHandler { @Override public void handleEvent(Event event) { LOGGER.trace("Looking for the Cassandra datasource"); - try (Session session = createSession()){ - ResultSet execute; - try { - execute = session.execute("USE " + keyspace + ";"); - } catch (InvalidQueryException e) { - session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };"); - session.execute("USE " + keyspace + ";"); - } - - execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';"); - List<Row> all = execute.all(); - boolean found = false; - for(Row row : all) { - String table = row.getString("columnfamily_name"); - if (table.equalsIgnoreCase(tableName)) { - found = true; - break; - } - } - if (!found) { - session.execute(createTableTemplate.replace("TABLENAME", tableName)); - LOGGER.debug("Table {} has been created", tableName); - } + try (Session session = cluster.connect()){ + useKeyspace(session, keyspace); + createTable(session, keyspace, tableName); Long timestamp = (Long) event.getProperty("timestamp"); - java.util.Date date = timestamp != null ? new java.util.Date(timestamp) : new java.util.Date(); - String jsonSt = marshaller.marshal(event); - - String upsertQuery = upsertQueryTemplate.replaceAll("TABLENAME", tableName); - if (timestamp == null) { timestamp = System.currentTimeMillis(); } - session.execute(upsertQuery, timestamp, jsonSt); + String jsonSt = marshaller.marshal(event); + session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt); LOGGER.trace("Data inserted into {} table", tableName); } catch (Exception e) { @@ -131,17 +109,32 @@ public class CassandraAppender implements EventHandler { } } - private Session createSession() { - Session session; - Builder clusterBuilder = Cluster.builder().addContactPoint(cassandraHost); - if (cassandraPort != null) { - clusterBuilder.withPort(cassandraPort); + private static void useKeyspace(Session session, String keyspace) { + try { + session.execute("USE " + keyspace + ";"); + } catch (InvalidQueryException e) { + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };"); + session.execute("USE " + keyspace + ";"); } - Cluster cluster = clusterBuilder.build(); - session = cluster.connect(); - return session; } - + + private static void createTable(Session session, String keyspace, String tableName) { + ResultSet execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';"); + List<Row> all = execute.all(); + boolean found = false; + for(Row row : all) { + String table = row.getString("columnfamily_name"); + if (table.equalsIgnoreCase(tableName)) { + found = true; + break; + } + } + if (!found) { + session.execute(String.format(createTableTemplate, tableName)); + LOGGER.debug("Table {} has been created", tableName); + } + } + @Reference public void setMarshaller(Marshaller marshaller) { this.marshaller = marshaller; http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/72db51a4/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java ---------------------------------------------------------------------- diff --git a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java index 5cd9e81..ec28736 100644 --- a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java +++ b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java @@ -5,7 +5,9 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; +import java.util.Dictionary; import java.util.HashMap; +import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -30,7 +32,7 @@ import com.datastax.driver.core.Session; public class CassandraAppenderTest { private static final String KEYSPACE = "decanter"; - private static final int CASSANDRA_PORT = 9142; + private static final String CASSANDRA_PORT = "9142"; private static final String CASSANDRA_HOST = "localhost"; private static final String TABLE_NAME = "decanter"; private static final String TOPIC = "decanter/collect/jmx"; @@ -68,8 +70,14 @@ public class CassandraAppenderTest { @Test public void testHandleEvent() throws Exception { Marshaller marshaller = new JsonMarshaller(); - CassandraAppender appender = new CassandraAppender(marshaller, KEYSPACE, TABLE_NAME, CASSANDRA_HOST, - CASSANDRA_PORT, null, null); + CassandraAppender appender = new CassandraAppender(); + Dictionary<String, Object> config = new Hashtable<String, Object>(); + config.put("cassandra.host", CASSANDRA_HOST); + config.put("cassandra.port", CASSANDRA_PORT); + config.put("keyspace.name", KEYSPACE); + config.put("table.name", TABLE_NAME); + appender.setMarshaller(marshaller); + appender.activate(config); Map<String, Object> properties = new HashMap<>(); properties.put(EventConstants.TIMESTAMP, TIMESTAMP); @@ -90,7 +98,7 @@ public class CassandraAppenderTest { private Session getSesion() { Builder clusterBuilder = Cluster.builder().addContactPoint(CASSANDRA_HOST); - clusterBuilder.withPort(CASSANDRA_PORT); + clusterBuilder.withPort(Integer.valueOf(CASSANDRA_PORT)); Cluster cluster = clusterBuilder.build(); return cluster.connect();
