STORM-1075 clean/refactor external module storm-cassandra
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5565c438 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5565c438 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5565c438 Branch: refs/heads/master Commit: 5565c43817deb2f3d7b77b5f6975dd3d16c802c4 Parents: 641300e Author: Florian Hussonnois <[email protected]> Authored: Wed Nov 18 18:20:21 2015 +0100 Committer: Florian Hussonnois <[email protected]> Committed: Tue Dec 1 22:36:17 2015 +0100 ---------------------------------------------------------------------- external/storm-cassandra/README.md | 36 +++++++--- .../storm/cassandra/Murmur3StreamGrouping.java | 46 +++++++++++- .../storm/cassandra/bolt/BaseCassandraBolt.java | 9 +-- .../bolt/BatchCassandraWriterBolt.java | 27 ++++--- .../cassandra/bolt/CassandraWriterBolt.java | 2 +- .../storm/cassandra/client/CassandraConf.java | 74 +++++++++++++++++--- .../storm/cassandra/client/ClusterFactory.java | 6 +- .../cassandra/client/impl/DefaultClient.java | 10 +-- .../storm/cassandra/executor/AsyncExecutor.java | 15 ++-- .../bolt/BatchCassandraWriterBoltTest.java | 3 +- .../cassandra/bolt/CassandraWriterBoltTest.java | 3 +- 11 files changed, 182 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/README.md ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md index 96454b6..5337b29 100644 --- a/external/storm-cassandra/README.md +++ b/external/storm-cassandra/README.md @@ -12,15 +12,18 @@ Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement* ### Configuration The following properties may be passed to storm configuration. -| **Property name** | **Description** | **Default** | -| ------------------------------------- | ----------------| -------------| -| **cassandra.keyspace** | - | | -| **cassandra.nodes** | - | {"localhost"}| -| **cassandra.username** | - | - | -| **cassandra.password** | - | - | -| **cassandra.port** | - | 9092 | -| **cassandra.output.consistencyLevel** | - | ONE | -| **cassandra.batch.size.rows** | - | 100 | +| **Property name** | **Description** | **Default** | +| ---------------------------------------------| ----------------| --------------------| +| **cassandra.keyspace** | - | | +| **cassandra.nodes** | - | {"localhost"} | +| **cassandra.username** | - | - | +| **cassandra.password** | - | - | +| **cassandra.port** | - | 9092 | +| **cassandra.output.consistencyLevel** | - | ONE | +| **cassandra.batch.size.rows** | - | 100 | +| **cassandra.retryPolicy** | - | DefaultRetryPolicy | +| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) | +| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) | ### CassandraWriterBolt @@ -160,6 +163,21 @@ For instance, this may be used to remit a new tuple on error, or to chain querie } ``` +### Murmur3FieldGrouping + +[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java) can be used to optimise cassandra writes. +The stream is partitioned among the bolt's tasks based on the specified row partition keys. + +```java +CassandraWriterBolt bolt = new CassandraWriterBolt( + insertInto("album") + .values( + with(fields("title", "year", "performer", "genre", "tracks") + ).build()); +builder.setBolt("BOLT_WRITER", bolt, 4) + .customGrouping("spout", new Murmur3StreamGrouping("title")) +``` + ## License Licensed to the Apache Software Foundation (ASF) under one http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java index a3f6887..966aacd 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java @@ -22,13 +22,18 @@ import backtype.storm.generated.GlobalStreamId; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.WorkerTopologyContext; import backtype.storm.topology.FailedException; +import backtype.storm.tuple.Fields; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -41,12 +46,43 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping { private List<Integer> targetTasks; + private List<Integer> partitionKeyIndexes; + + /** + * A list of partition key. The order of specified keys will be used to generate the partition key hash. + * It should respect the column order defined into the targeted CQL table. + */ + private List<String> partitionKeyNames; + + /** + * Creates a new {@link Murmur3StreamGrouping} instance. + * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}. + */ + public Murmur3StreamGrouping(String...partitionKeyNames) { + this( Arrays.asList(partitionKeyNames)); + } + + /** + * Creates a new {@link Murmur3StreamGrouping} instance. + * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}. + */ + public Murmur3StreamGrouping(List<String> partitionKeyNames) { + this.partitionKeyNames = partitionKeyNames; + } + + /** * {@inheritDoc} */ @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.targetTasks = targetTasks; + + this.partitionKeyIndexes = new ArrayList<>(); + Fields componentOutputFields = context.getComponentOutputFields(stream); + for (String partitionKeyName : partitionKeyNames) { + partitionKeyIndexes.add(componentOutputFields.fieldIndex(partitionKeyName)); + } } /** @@ -55,13 +91,21 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping { @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { try { - int n = Math.abs( (int) hashes(values) % targetTasks.size() ); + int n = Math.abs( (int) hashes(getKeyValues(values)) % targetTasks.size() ); return Lists.newArrayList(targetTasks.get(n)); } catch (IOException e) { throw new FailedException(e); } } + private List<Object> getKeyValues(List<Object> values) { + List<Object> keys = new ArrayList<>(); + for(Integer idx : partitionKeyIndexes) { + keys.add(values.get(idx)); + } + return keys; + } + /** * Computes the murmur3 hash for the specified values. * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java index 7211ad3..dafcb22 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java @@ -26,6 +26,7 @@ import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.utils.TupleUtils; +import backtype.storm.utils.Utils; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; import org.apache.storm.cassandra.BaseExecutionResultHandler; @@ -105,7 +106,7 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt { } public BaseCassandraBolt withOutputFields(Fields fields) { - this.outputsFields.put(null, fields); + this.outputsFields.put(Utils.DEFAULT_STREAM_ID, fields); return this; } @@ -139,7 +140,7 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt { public final void execute(Tuple input) { getAsyncHandler().flush(outputCollector); if (TupleUtils.isTick(input)) { - tick(); + onTickTuple(); outputCollector.ack(input); } else { process(input); @@ -156,14 +157,14 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt { /** * Calls by an input tick tuple. */ - abstract protected void tick(); + abstract protected void onTickTuple(); /** * {@inheritDoc} */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - Fields fields = this.outputsFields.remove(null); + Fields fields = this.outputsFields.remove(Utils.DEFAULT_STREAM_ID); if( fields != null) declarer.declare(fields); for(Map.Entry<String, Fields> entry : this.outputsFields.entrySet()) { declarer.declareStream(entry.getKey(), entry.getValue()); http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java index c4c0110..fd597df 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java @@ -42,14 +42,14 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { public static final int DEFAULT_EMIT_FREQUENCY = 2; - private static final int QUEUE_MAX_SIZE = 1000; - private LinkedBlockingQueue<Tuple> queue; private int tickFrequencyInSeconds; private long lastModifiedTimesMillis; + private int batchMaxSize = 1000; + private String componentID; private AsyncResultHandler<List<Tuple>> asyncResultHandler; @@ -79,7 +79,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) { super.prepare(stormConfig, topologyContext, outputCollector); this.componentID = topologyContext.getThisComponentId(); - this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); + this.queue = new LinkedBlockingQueue<>(batchMaxSize); this.lastModifiedTimesMillis = now(); } @@ -106,7 +106,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { * {@inheritDoc} */ @Override - protected void tick() { + protected void onTickTuple() { prepareAndExecuteStatement(); } @@ -119,7 +119,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { List<PairStatementTuple> psl = buildStatement(inputs); int sinceLastModified = updateAndGetSecondsSinceLastModified(); - LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified)); + LOG.debug(logPrefix() + "Execute cql batches with {} statements after {} seconds", size, sinceLastModified); checkTimeElapsedSinceLastExec(sinceLastModified); @@ -127,14 +127,14 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { int batchSize = 0; for (PairBatchStatementTuples batch : batchBuilder) { - LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size())); + LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size()); getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs()); batchSize++; } - int pending = getAsyncExecutor().getPendingExec(); + int pending = getAsyncExecutor().getPendingTasksSize(); if (pending > batchSize) { - LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending)); + LOG.warn( logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize, pending); } } catch (Throwable r) { @@ -157,7 +157,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { private void checkTimeElapsedSinceLastExec(int sinceLastModified) { if(sinceLastModified > tickFrequencyInSeconds) - LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds)); + LOG.warn( logPrefix() + "The time elapsed since last execution exceeded tick tuple frequency - {} > {} seconds", sinceLastModified, tickFrequencyInSeconds); } private String logPrefix() { @@ -170,6 +170,15 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { } /** + * Maximum number of tuple kept in memory before inserting batches to cassandra. + * @param size the max queue size. + * @return <code>this</code> + */ + public BatchCassandraWriterBolt withQueueSize(int size) { + this.batchMaxSize = size; + return this; + } + /** * {@inheritDoc} */ @Override http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java index 663f26a..19097f2 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java @@ -63,7 +63,7 @@ public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> { * {@inheritDoc} */ @Override - protected void tick() { + protected void onTickTuple() { /** do nothing **/ } } http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java index ccee468..9201801 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java @@ -18,11 +18,17 @@ */ package org.apache.storm.cassandra.client; +import backtype.storm.utils.Utils; import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.RetryPolicy; import com.google.common.base.Objects; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Configuration used by cassandra storm components. @@ -36,6 +42,9 @@ public class CassandraConf implements Serializable { public static final String CASSANDRA_NODES = "cassandra.nodes"; public static final String CASSANDRA_PORT = "cassandra.port"; public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows"; + public static final String CASSANDRA_RETRY_POLICY = "cassandra.retryPolicy"; + public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = "cassandra.reconnectionPolicy.baseDelayMs"; + public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = "cassandra.reconnectionPolicy.maxDelayMs"; /** * The authorized cassandra username. @@ -67,6 +76,21 @@ public class CassandraConf implements Serializable { * The maximal numbers of rows per batch. */ private int batchSizeRows = 100; + + /** + * The retry policy to use for the new cluster. + */ + private String retryPolicyName; + + /** + * The base delay in milliseconds to use for the reconnection policy. + */ + private long reconnectionPolicyBaseMs; + + /** + * The maximum delay to wait between two attempts. + */ + private long reconnectionPolicyMaxMs; /** * Creates a new {@link CassandraConf} instance. @@ -81,13 +105,17 @@ public class CassandraConf implements Serializable { * @param conf The storm configuration. */ public CassandraConf(Map<String, Object> conf) { - this.username = getOrElse(conf, CASSANDRA_USERNAME, null); - this.password = getOrElse(conf, CASSANDRA_PASSWORD, null); + + this.username = (String)Utils.get(conf, CASSANDRA_USERNAME, null); + this.password = (String)Utils.get(conf, CASSANDRA_PASSWORD, null); this.keyspace = get(conf, CASSANDRA_KEYSPACE); - this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name())); - this.nodes = getOrElse(conf, CASSANDRA_NODES, "localhost").split(","); - this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100); - this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042; + this.consistencyLevel = ConsistencyLevel.valueOf((String)Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name())); + this.nodes = ((String)Utils.get(conf, CASSANDRA_NODES, "localhost")).split(","); + this.batchSizeRows = Utils.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100); + this.port = Utils.getInt(conf.get(CASSANDRA_PORT), 9042); + this.retryPolicyName = (String)Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName()); + this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L); + this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1)); } public String getUsername() { @@ -118,6 +146,24 @@ public class CassandraConf implements Serializable { return this.port; } + public long getReconnectionPolicyBaseMs() { + return reconnectionPolicyBaseMs; + } + + public long getReconnectionPolicyMaxMs() { + return reconnectionPolicyMaxMs; + } + + public RetryPolicy getRetryPolicy() { + if(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName())) + return DowngradingConsistencyRetryPolicy.INSTANCE; + if(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName())) + return FallthroughRetryPolicy.INSTANCE; + if(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName())) + return DefaultRetryPolicy.INSTANCE; + throw new IllegalArgumentException("Unknown cassandra retry policy " + this.retryPolicyName); + } + private <T> T get(Map<String, Object> conf, String key) { Object o = conf.get(key); if(o == null) { @@ -126,9 +172,16 @@ public class CassandraConf implements Serializable { return (T)o; } - private <T> T getOrElse(Map<String, Object> conf, String key, T def) { - T o = (T) conf.get(key); - return (o == null) ? def : o; + public static Long getLong(Object o, Long defaultValue) { + if (null == o) { + return defaultValue; + } + if (o instanceof Number) { + return ((Number) o).longValue(); + } else if (o instanceof String) { + return Long.parseLong((String) o); + } + throw new IllegalArgumentException("Don't know how to convert " + o + " to long"); } @Override @@ -141,6 +194,9 @@ public class CassandraConf implements Serializable { .add("port", port) .add("consistencyLevel", consistencyLevel) .add("batchSizeRows", batchSizeRows) + .add("retryPolicyName", retryPolicyName) + .add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs) + .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs) .toString(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java index 886f6d3..c00bfd7 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java @@ -50,8 +50,10 @@ public class ClusterFactory extends BaseBeanFactory<Cluster> { .withoutMetrics() .addContactPoints(cassandraConf.getNodes()) .withPort(cassandraConf.getPort()) - .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) - .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1))) + .withRetryPolicy(cassandraConf.getRetryPolicy()) + .withReconnectionPolicy(new ExponentialReconnectionPolicy( + cassandraConf.getReconnectionPolicyBaseMs(), + cassandraConf.getReconnectionPolicyMaxMs())) .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); final String username = cassandraConf.getUsername(); http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java index 8ed9293..945d0a8 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java @@ -77,14 +77,14 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable { @Override public synchronized Session connect() throws NoHostAvailableException { if( isDisconnected() ) { - LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName())); + LOG.info("Connected to cluster: {}", cluster.getClusterName()); for ( Host host : getAllHosts()) - LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack())); + LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack()); - LOG.info(String.format("Connect to cluster using keyspace %s", keyspace)); + LOG.info("Connect to cluster using keyspace %s", keyspace); session = cluster.connect(keyspace); } else { - LOG.warn(String.format("%s - Already connected to cluster: %s", getExecutorName(), cluster.getClusterName())); + LOG.warn("{} - Already connected to cluster: {}", getExecutorName(), cluster.getClusterName()); } if( session.isClosed() ) { @@ -107,7 +107,7 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable { @Override public void close( ) { if( cluster != null && !cluster.isClosed() ) { - LOG.info(String.format("Try to close connection to cluster: %s", cluster.getClusterName())); + LOG.info("Try to close connection to cluster: {}", cluster.getClusterName()); session.close(); cluster.close(); } http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java index 311ed11..5366c81 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; /** * Service to asynchronously executes cassandra statements. @@ -47,7 +48,7 @@ public class AsyncExecutor<T> implements Serializable { protected AsyncResultHandler<T> handler; - private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( ); + private AtomicInteger pending = new AtomicInteger(); /** * Creates a new {@link AsyncExecutor} instance. @@ -73,8 +74,8 @@ public class AsyncExecutor<T> implements Serializable { } /** - * Asynchronously executes all statements associated to the specified input. The input will be passed to - * the {@link #handler} once all queries succeed or failed. + * Asynchronously executes all statements associated to the specified input. + * The input will be passed to handler#onSuccess once all queries succeed or to handler#onFailure if any one of them fails. */ public List<SettableFuture<T>> execAsync(List<Statement> statements, final T input) { @@ -111,11 +112,11 @@ public class AsyncExecutor<T> implements Serializable { */ public SettableFuture<T> execAsync(final Statement statement, final T inputs, final AsyncResultHandler<T> handler) { final SettableFuture<T> settableFuture = SettableFuture.create(); - pending.put(settableFuture, true); + pending.incrementAndGet(); ResultSetFuture future = session.executeAsync(statement); Futures.addCallback(future, new FutureCallback<ResultSet>() { public void release() { - pending.remove(settableFuture); + pending.decrementAndGet(); } @Override @@ -139,8 +140,8 @@ public class AsyncExecutor<T> implements Serializable { /** * Returns the number of currently executed tasks which are not yet completed. */ - public int getPendingExec( ) { - return this.pending.size(); + public int getPendingTasksSize() { + return this.pending.intValue(); } public void shutdown( ) { http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java index 8d80ee1..4253189 100644 --- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java @@ -23,6 +23,7 @@ import com.datastax.driver.core.ResultSet; import org.apache.storm.cassandra.WeatherSpout; import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import static org.apache.storm.cassandra.DynamicStatementBuilder.*; @@ -36,7 +37,7 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest { public static final String SPOUT_MOCK = "spout-mock"; public static final String BOLT_WRITER = "writer"; - @Test + @Test @Ignore("The sleep method should be used in tests") public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() { executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto())); } http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java index e1a9e9f..7717d4d 100644 --- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java @@ -23,6 +23,7 @@ import com.datastax.driver.core.ResultSet; import org.apache.storm.cassandra.WeatherSpout; import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import static org.apache.storm.cassandra.DynamicStatementBuilder.*; @@ -35,7 +36,7 @@ public class CassandraWriterBoltTest extends BaseTopologyTest { public static final String SPOUT_MOCK = "spout-mock"; public static final String BOLT_WRITER = "writer"; - @Test + @Test @Ignore("The sleep method should be used in tests") public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() { executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto()))); }
