Repository: storm
Updated Branches:
  refs/heads/master 0acc1cee6 -> 224c846ec


http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java
new file mode 100644
index 0000000..df00530
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.context;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Simple interface used for providing services based on the storm 
configuration.
+ */
+public interface BeanFactory<T> extends Serializable {
+
+    /**
+     * Sets the storm context.
+     * @param context
+     */
+    public void setStormContext(WorkerCtx context);
+
+    /**
+     * Return an instance, which may be shared or independent, of the 
specified type.
+     * @param stormConf The storm configuration
+     * @return
+     */
+    T get(Map<String, Object> stormConf);
+
+
+    /**
+     * Returns a new copy if this factory.
+     * @return a new {@link BeanFactory} instance.
+     */
+    public BeanFactory<T> newInstance();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java
new file mode 100644
index 0000000..384d84c
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.context;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Simple class used to register singletons within a storm worker.
+ */
+public class WorkerCtx implements Serializable {
+
+    private static final ConcurrentMap<Class, BeanFactory<?>> workerCtx = new 
ConcurrentHashMap<>();
+
+    private Map<Class, BeanFactory<?>> componentCtx = new HashMap<>();
+
+    /**
+     * Creates a new {@link WorkerCtx} instance.
+     */
+    public WorkerCtx() {
+        super();
+    }
+
+    /**
+     * Register a bean provider for a specified type.
+     */
+    public <T> void register(Class<T> clazz, BeanFactory<T> provider) {
+        componentCtx.put(clazz, provider);
+    }
+
+    /**
+     * Return an instance provider of the specified type.
+     * @throws RuntimeException if no bean provider can be resolve for the 
given class.
+     * @return
+     */
+    protected <T> BeanFactory<T> getBeanfactory(Class<T> clazz) {
+        BeanFactory<T> factory = (BeanFactory<T>) this.componentCtx.get(clazz);
+        if( factory == null) throw new RuntimeException("Cannot resolve bean 
factory for class : " + clazz.getCanonicalName());
+        factory.setStormContext(this);
+        return factory;
+    }
+
+    /**
+     * Return an instance, which is shared (within a Worker), of the specified 
type.
+     * @return
+     */
+    public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> stormConf) {
+        return getWorkerBean(clazz, stormConf,false);
+    }
+
+    /**
+     * Return an instance, which is shared (within a Worker), of the specified 
type.
+     *
+     * @param clazz the class of the bean.
+     * @param stormConf the storm configuration
+     * @param force if <code>true</code>= create a new instance even if one 
already exist.
+     *
+     * @return a instance of type {@link T}.
+     */
+    public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> stormConf, 
boolean force) {
+        if( force ) workerCtx.remove(clazz);
+        BeanFactory<T> factory = (BeanFactory<T>)  this.workerCtx.get(clazz);
+        if( factory == null) {
+            BeanFactory<T> instance = getBeanfactory(clazz).newInstance();
+            workerCtx.putIfAbsent(clazz, instance);
+            factory = (BeanFactory<T>) this.workerCtx.get(clazz);
+        }
+        return factory.get((Map<String, Object>)stormConf);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
new file mode 100644
index 0000000..311ed11
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Service to asynchronously executes cassandra statements.
+ */
+public class AsyncExecutor<T> implements Serializable {
+
+    private final static Logger LOG = 
LoggerFactory.getLogger(AsyncExecutor.class);
+
+    protected Session session;
+
+    protected ExecutorService executorService;
+
+    protected AsyncResultHandler<T> handler;
+
+    private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( 
);
+
+    /**
+     * Creates a new {@link AsyncExecutor} instance.
+     */
+    protected AsyncExecutor(Session session, AsyncResultHandler<T> handler) {
+        this(session, newSingleThreadExecutor(), handler);
+    }
+
+    /**
+     * Creates a new {@link AsyncExecutor} instance.
+     *
+     * @param session The cassandra session.
+     * @param executorService The executor service responsible to execute 
handler.
+     */
+    private AsyncExecutor(Session session, ExecutorService executorService, 
AsyncResultHandler<T> handler) {
+        this.session   = session;
+        this.executorService = executorService;
+        this.handler = handler;
+    }
+
+    protected static ExecutorService newSingleThreadExecutor() {
+        return Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
+    }
+
+    /**
+     * Asynchronously executes all statements associated to the specified 
input. The input will be passed to
+     * the {@link #handler} once all queries succeed or failed.
+     */
+    public List<SettableFuture<T>> execAsync(List<Statement> statements, final 
T input) {
+
+        List<SettableFuture<T>> settableFutures = new 
ArrayList<>(statements.size());
+
+        for(Statement s : statements)
+            settableFutures.add(execAsync(s, input, 
AsyncResultHandler.NO_OP_HANDLER));
+
+        ListenableFuture<List<T>> allAsList = 
Futures.allAsList(settableFutures);
+        Futures.addCallback(allAsList, new FutureCallback<List<T>>(){
+            @Override
+            public void onSuccess(List<T> inputs) {
+                handler.success(input);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                handler.failure(t, input);
+            }
+        }, executorService);
+        return settableFutures;
+    }
+
+    /**
+     * Asynchronously executes the specified batch statement. Inputs will be 
passed to
+     * the {@link #handler} once query succeed or failed.
+     */
+    public SettableFuture<T> execAsync(final Statement statement, final T 
inputs) {
+        return execAsync(statement, inputs, handler);
+    }
+    /**
+     * Asynchronously executes the specified batch statement. Inputs will be 
passed to
+     * the {@link #handler} once query succeed or failed.
+     */
+    public SettableFuture<T> execAsync(final Statement statement, final T 
inputs, final AsyncResultHandler<T> handler) {
+        final SettableFuture<T> settableFuture = SettableFuture.create();
+        pending.put(settableFuture, true);
+        ResultSetFuture future = session.executeAsync(statement);
+        Futures.addCallback(future, new FutureCallback<ResultSet>() {
+            public void release() {
+                pending.remove(settableFuture);
+            }
+
+            @Override
+            public void onSuccess(ResultSet result) {
+                release();
+                settableFuture.set(inputs);
+                handler.success(inputs);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error(String.format("Failed to execute statement '%s' ", 
statement), t);
+                release();
+                settableFuture.setException(t);
+                handler.failure(t, inputs);
+            }
+        }, executorService);
+        return settableFuture;
+    }
+
+    /**
+     * Returns the number of currently executed tasks which are not yet 
completed.
+     */
+    public int getPendingExec( ) {
+        return this.pending.size();
+    }
+
+    public void shutdown( ) {
+        if( ! executorService.isShutdown() ) {
+            LOG.info("shutting down async handler executor");
+            this.executorService.shutdownNow();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
new file mode 100644
index 0000000..0c684c0
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.Session;
+
+/**
+ * This class must be used to obtain a single instance of {@link 
AsyncExecutor} per storm executor.
+ */
+public class AsyncExecutorProvider {
+
+    private static final ThreadLocal<AsyncExecutor> localAsyncExecutor = new 
ThreadLocal<>();
+
+    /**
+     * Returns a new {@link AsyncExecutor} per storm executor.
+     */
+    public static <T> AsyncExecutor getLocal(Session session, 
AsyncResultHandler<T> handler) {
+        AsyncExecutor<T> executor = localAsyncExecutor.get();
+        if( executor == null ) {
+            localAsyncExecutor.set(executor = new AsyncExecutor<>(session, 
handler));
+        }
+        return executor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java
new file mode 100644
index 0000000..9b51696
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import backtype.storm.task.OutputCollector;
+
+import java.io.Serializable;
+
+/**
+ * Default handler for batch asynchronous execution.
+ */
+public interface AsyncResultHandler<T> extends Serializable {
+
+    public static final AsyncResultHandler NO_OP_HANDLER = new 
AsyncResultHandler() {
+        @Override
+        public void failure(Throwable t, Object inputs) {
+            /** no-operation **/
+        }
+
+        @Override
+        public void success(Object inputs) {
+            /** no-operation **/
+        }
+
+        @Override
+        public void flush(OutputCollector collector) {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    /**
+     * This method is responsible for failing specified inputs.
+     *
+     * @param t The cause the failure.
+     * @param inputs The input tuple proceed.
+     */
+    void failure(Throwable t, T inputs);
+
+    /**
+     * This method is responsible for acknowledging specified inputs.
+     *
+     * @param inputs The input tuple proceed.
+     */
+    void success(T inputs) ;
+
+    void flush(OutputCollector collector);
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java
new file mode 100644
index 0000000..d0f5e1d
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Lists;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+
+import java.util.List;
+
+/**
+ * This class is responsible to collect input tuples proceed.
+ */
+public interface ExecutionResultCollector {
+
+    void handle(OutputCollector collector, ExecutionResultHandler handler);
+
+    public static final class SucceedCollector implements 
ExecutionResultCollector {
+
+        private final List<Tuple> inputs;
+
+        /**
+         * Creates a new {@link ExecutionResultCollector} instance.
+         * @param input the input tuple.
+         */
+        public SucceedCollector(Tuple input) {
+           this(Lists.newArrayList(input));
+        }
+
+        /**
+         * Creates a new {@link ExecutionResultCollector} instance.
+         * @param inputs the input tuple.
+         */
+        public SucceedCollector(List<Tuple> inputs) {
+            this.inputs = inputs;
+        }
+
+        /**
+         * Calls {@link 
ExecutionResultHandler#onQuerySuccess(backtype.storm.task.OutputCollector, 
backtype.storm.tuple.Tuple)} before
+         * acknowledging an single input tuple.
+         */
+        @Override
+        public void handle(OutputCollector collector, ExecutionResultHandler 
handler) {
+            for(Tuple t : inputs) handler.onQuerySuccess(collector, t);
+            for(Tuple t : inputs) collector.ack(t);
+        }
+    }
+
+    public static final class FailedCollector implements 
ExecutionResultCollector {
+
+        private final Throwable cause;
+        private final List<Tuple> inputs;
+
+        /**
+         * Creates a new {@link FailedCollector} instance.
+         * @param input the input tuple.
+         * @param cause the cause of the error.
+         */
+        public FailedCollector(Tuple input, Throwable cause) {
+            this(Lists.newArrayList(input), cause);
+        }
+
+        /**
+         * Creates a new {@link FailedCollector} instance.
+         * @param inputs the input tuple.
+         * @param cause the cause of the error.
+         */
+        public FailedCollector(List<Tuple> inputs, Throwable cause) {
+            this.inputs = inputs;
+            this.cause = cause;
+        }
+
+        /**
+         * Calls {@link ExecutionResultHandler#onThrowable(Throwable, 
backtype.storm.task.OutputCollector, backtype.storm.tuple.Tuple)} .
+         */
+        @Override
+        public void handle(OutputCollector collector, ExecutionResultHandler 
handler) {
+            handler.onThrowable(cause, collector, inputs);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
new file mode 100644
index 0000000..c81da8c
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor.impl;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.ExecutionResultCollector;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+public class BatchAsyncResultHandler implements 
AsyncResultHandler<List<Tuple>> {
+
+    private ConcurrentLinkedQueue<ExecutionResultCollector> completed;
+
+    private ExecutionResultHandler handler;
+
+    /**
+     * Creates a new {@link BatchAsyncResultHandler} instance.
+     * @param handler
+     */
+    public BatchAsyncResultHandler(ExecutionResultHandler handler) {
+        this.handler = handler;
+        this.completed = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * This method is responsible for failing specified inputs.
+     *
+     * The default method does no-operation.
+     */
+    public void failure(Throwable t, List<Tuple> input) {
+        completed.offer(new ExecutionResultCollector.FailedCollector(input, 
t));
+    }
+    /**
+     * This method is responsible for acknowledging specified inputs.
+     *
+     * The default method does no-operation.
+     */
+    public void success(List<Tuple> input) {
+        completed.offer(new ExecutionResultCollector.SucceedCollector(input));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void flush(final OutputCollector collector) {
+        ExecutionResultCollector poll;
+        while( (poll = completed.poll()) != null ) {
+            poll.handle(collector, handler);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
new file mode 100644
index 0000000..62a5a3b
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor.impl;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.ExecutionResultCollector;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> {
+
+    private ConcurrentLinkedQueue<ExecutionResultCollector> completed;
+
+    private ExecutionResultHandler handler;
+
+    /**
+     * Creates a new {@link SingleAsyncResultHandler} instance.
+     * @param handler
+     */
+    public SingleAsyncResultHandler(ExecutionResultHandler handler) {
+        this.handler = handler;
+        this.completed = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * This method is responsible for failing specified inputs.
+     *
+     * The default method does no-operation.
+     */
+    public void failure(Throwable t, Tuple input) {
+        completed.offer(new ExecutionResultCollector.FailedCollector(input, 
t));
+    }
+    /**
+     * This method is responsible for acknowledging specified inputs.
+     *
+     * The default method does no-operation.
+     */
+    public void success(Tuple input) {
+        completed.offer(new ExecutionResultCollector.SucceedCollector(input));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void flush(final OutputCollector collector) {
+        ExecutionResultCollector poll;
+        while( (poll = completed.poll()) != null ) {
+            poll.handle(collector, handler);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
new file mode 100644
index 0000000..32ff1de
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+public class BatchStatementTupleMapper implements CQLStatementTupleMapper {
+
+    private final List<CQLStatementTupleMapper> mappers;
+    private final BatchStatement.Type type;
+
+    /**
+     * Creates a new {@link BatchStatementTupleMapper} instance.
+     * @param type
+     * @param mappers
+     */
+    public BatchStatementTupleMapper(BatchStatement.Type type, 
List<CQLStatementTupleMapper> mappers) {
+        this.mappers = new ArrayList<>(mappers);
+        this.type = type;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        final BatchStatement batch = new BatchStatement(this.type);
+        for(CQLStatementTupleMapper m : mappers)
+            batch.addAll(m.map(conf, session, tuple));
+        return Arrays.asList((Statement)batch);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
new file mode 100644
index 0000000..baa4685
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.querybuilder.Clause;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to
+ * a list of cassandra {@link com.datastax.driver.core.querybuilder.Clause}s.
+ *
+ */
+public interface CQLClauseTupleMapper extends Serializable {
+
+    List<Clause> map(ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
new file mode 100644
index 0000000..6c04d1c
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import java.io.Serializable;
+
+
+public interface CQLStatementBuilder extends Serializable {
+
+    /**
+     * Builds a new {@link CQLStatementTupleMapper} instance.
+     * @return a new CQLStatementMapper instance.
+     */
+    <T extends CQLStatementTupleMapper> T build();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
new file mode 100644
index 0000000..32c86dd
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.google.common.base.Optional;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+
+/**
+ * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL 
{@link com.datastax.driver.core.Statement}.
+ *
+ */
+public interface CQLStatementTupleMapper extends Serializable {
+
+    public static final String FIELD_KEYSPACE =  "keyspace";
+    public static final String FIELD_TABLE    =  "table";
+    public static final String FIELD_VALUES   =  "value";
+
+    /**
+     * Maps a given tuple to one or multiple CQL statements.
+     *
+     * @param conf the storm configuration map.
+     * @param session the cassandra session.
+     * @param tuple the incoming tuple to map.
+     * @return a list of {@link com.datastax.driver.core.Statement}.
+     */
+    List<Statement> map(Map conf, Session session, ITuple tuple);
+
+    public static class InsertCQLStatementTupleMapper implements 
CQLStatementTupleMapper {
+        @Override
+        public List<Statement> map(Map conf, Session session, ITuple tuple) {
+            Optional<String> ks = 
Optional.fromNullable(tuple.contains(FIELD_KEYSPACE) ? 
tuple.getStringByField(FIELD_KEYSPACE) : null);
+            String table = tuple.getStringByField(FIELD_TABLE);
+            Map<String, Object> values = (Map<String, Object>) 
tuple.getValueByField(FIELD_VALUES);
+
+            final Insert stmt = (ks.isPresent()) ? insertInto(ks.get(), table) 
: insertInto(table);
+            for(Map.Entry<String, Object> v : values.entrySet())
+                stmt.value(v.getKey(), v.getValue());
+
+            return Arrays.asList((Statement)stmt);
+        }
+    }
+
+    public static class DynamicCQLStatementTupleMapper implements 
CQLStatementTupleMapper {
+        private List<CQLStatementBuilder> builders;
+
+        public DynamicCQLStatementTupleMapper(List<CQLStatementBuilder> 
builders) {
+            this.builders = builders;
+        }
+
+        @Override
+        public List<Statement> map(Map conf, Session session, ITuple tuple) {
+            List<Statement> statements = new LinkedList<>();
+            for(CQLStatementBuilder b : builders) {
+                statements.addAll(b.build().map(conf, session, tuple));
+            }
+            return statements;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
new file mode 100644
index 0000000..574eac9
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to a 
table name.
+ *
+ */
+public interface CQLTableTupleMapper extends Serializable {
+
+    /**
+     * Returns a table's name from the specified tuple.
+     *
+     * @param tuple the incoming tuple.
+     * @return the table name.
+     */
+    String map(ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
new file mode 100644
index 0000000..2cb8e4c
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to Map 
of values of a CQL statement.
+ *
+ */
+public interface CQLValuesTupleMapper extends Serializable {
+
+    /**
+     * Map the specified {@code tuple} to values of CQL statement(s).
+     * @param tuple the incoming tuple to map.
+     * @return values of CQL statement(s)
+     */
+    Map<String, Object> map(ITuple tuple);
+
+    /**
+     * Default {@link CQLValuesTupleMapper} implementation to get specifics 
tuple's fields.
+     */
+    public static class WithFieldTupleMapper implements CQLValuesTupleMapper {
+        private List<FieldSelector> fields;
+
+        public WithFieldTupleMapper(List<FieldSelector> fields) {
+            this.fields = fields;
+        }
+
+        @Override
+        public Map<String, Object> map(ITuple tuple) {
+            Map<String, Object> ret = new LinkedHashMap<>();
+            for(FieldSelector fs : fields)
+                fs.selectAndPut(tuple, ret);
+            return ret;
+        }
+    }
+
+    /**
+     * Default {@link CQLValuesTupleMapper} implementation to get all tuple's 
fields.
+     */
+    public static class AllTupleMapper implements CQLValuesTupleMapper {
+        @Override
+        public Map<String, Object> map(ITuple tuple) {
+            Map<String, Object> ret = new LinkedHashMap<>();
+            for(String name  : tuple.getFields())
+                ret.put(name, tuple.getValueByField(name));
+            return ret;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java
new file mode 100644
index 0000000..4cc4c49
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java
@@ -0,0 +1,83 @@
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * This interface may be used to retrieve a cassandra bound query either from 
storm config
+ * or the tuple being proceed.
+ *
+ */
+public interface ContextQuery extends Serializable {
+
+    /**
+     * Resolves a cassandra bound query.
+     *
+     * @param config the storm configuration
+     * @param tuple the tuple being proceed.
+     *
+     * @return a string bound query.
+     */
+    public String resolves(Map config, ITuple tuple);
+
+    /**
+     * Static implementation of {@link ContextQuery} interface.
+     */
+    public static final class StaticContextQuery implements ContextQuery {
+        private final String value;
+
+        /**
+         * Creates a new {@link StaticContextQuery} instance.
+         * @param value
+         */
+        public StaticContextQuery(String value) {
+            this.value = value;
+        }
+
+        @Override
+        public String resolves(Map config, ITuple tuple) {
+            return value;
+        }
+    }
+
+    /**
+     * Default {@link BoundQueryContext} implementation to retrieve a bound 
query
+     * identified by the provided key.
+     */
+    public static final class BoundQueryContext implements ContextQuery {
+        private String key;
+
+        public BoundQueryContext(String key) {
+            this.key = key;
+        }
+
+        @Override
+        public String resolves(Map config, ITuple tuple) {
+            if (config.containsKey(key)) return (String) config.get(key);
+
+            throw new IllegalArgumentException("Bound query '" + key + "' does 
not exist in configuration");
+        }
+    }
+
+    /**
+     * Default {@link BoundQueryNamedByFieldContext} implementation to 
retrieve a bound query named by
+     * the value of a specified tuple field.
+     */
+    public static final class BoundQueryNamedByFieldContext implements 
ContextQuery {
+
+        private String fieldName;
+
+        public BoundQueryNamedByFieldContext(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String resolves(Map config, ITuple tuple) {
+            String name = tuple.getStringByField(fieldName);
+            if (config.containsKey(name)) return (String) config.get(name);
+            throw new IllegalArgumentException("Bound query '" + name + "' 
does not exist in configuration");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
new file mode 100644
index 0000000..683824e
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL 
{@link com.datastax.driver.core.Statement}.
+ *
+ */
+public abstract class SimpleCQLStatementTupleMapper implements 
CQLStatementTupleMapper, Serializable {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        return Arrays.asList(map(tuple));
+    }
+
+    /**
+     * Maps a given tuple to a single CQL statements.
+     *
+     * @param tuple the incoming tuple to map.
+     * @return a list of {@link com.datastax.driver.core.Statement}.
+     */
+    public abstract Statement map(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
new file mode 100644
index 0000000..4348253
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.cassandra.query.ContextQuery.*;
+
+
+public class BoundStatementMapperBuilder implements Serializable {
+    private final ContextQuery contextQuery;
+
+    /**
+     * Creates a new {@link BoundStatementMapperBuilder} instance.
+     * @param cql
+     */
+    public BoundStatementMapperBuilder(String cql) {
+        this.contextQuery = new StaticContextQuery(cql);
+    }
+
+    /**
+     * Creates a new {@link BoundStatementMapperBuilder} instance.
+     * @param contextQuery
+     */
+    public BoundStatementMapperBuilder(ContextQuery contextQuery) {
+        this.contextQuery = contextQuery;
+    }
+
+    public CQLStatementTupleMapper bind(final CQLValuesTupleMapper mapper) {
+        return new CQLBoundStatementTupleMapper(contextQuery, mapper);
+    }
+
+    public static class CQLBoundStatementTupleMapper implements 
CQLStatementTupleMapper {
+
+        private final ContextQuery contextQuery;
+
+        private final CQLValuesTupleMapper mapper;
+
+        private Map<String, PreparedStatement> cache = new HashMap<>();
+
+        /**
+         * Creates a new {@link CQLBoundStatementTupleMapper} instance.
+         *
+         * @param contextQuery
+         * @param mapper
+         */
+        CQLBoundStatementTupleMapper(ContextQuery contextQuery, 
CQLValuesTupleMapper mapper) {
+            this.contextQuery = contextQuery;
+            this.mapper = mapper;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public List<Statement> map(Map config, Session session, ITuple tuple) {
+            final Map<String, Object> values = mapper.map(tuple);
+            final String query = contextQuery.resolves(config, tuple);
+            Object[] objects = values.values().toArray(new 
Object[values.size()]);
+            PreparedStatement statement = getPreparedStatement(session, query);
+            return Arrays.asList((Statement)statement.bind(objects));
+        }
+
+        /**
+         * Get or prepare a statement using the specified session and the 
query.
+         * *
+         * @param session The cassandra session.
+         * @param query The CQL query to prepare.
+         */
+        private PreparedStatement getPreparedStatement(Session session, String 
query) {
+            PreparedStatement statement = cache.get(query);
+            if( statement == null) {
+                statement = session.prepare(query);
+                cache.put(query, statement);
+            }
+            return statement;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
new file mode 100644
index 0000000..8eddb04
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.CQLTableTupleMapper;
+import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
+import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class InsertStatementBuilder<T extends CQLValuesTupleMapper> implements 
CQLStatementBuilder {
+    /**
+     * Optional name of the table into insert.
+     */
+    protected final String table;
+    /**
+     * Optional keyspace name to insert.
+     */
+    protected final String keyspace;
+    /**
+     * Optional mapper to retrieve a table name from tuple.
+     */
+    protected final CQLTableTupleMapper mapper2TableName;
+
+    protected T valuesMapper;
+
+    protected boolean ifNotExist;
+
+    protected Integer ttlInSeconds;
+
+    protected ConsistencyLevel level;
+
+    /**
+     * Creates a new {@link InsertStatementBuilder} instance.
+     *
+     * @param table    name of the table into insert.
+     */
+    public InsertStatementBuilder(String table) {
+        this(null, table, null);
+    }
+
+    /**
+     * Creates a new {@link InsertStatementBuilder} instance.
+     *
+     * @param table    name of the table into insert.
+     * @param keyspace the keyspace's name to which the table belongs
+     */
+    public InsertStatementBuilder(String table, String keyspace) {
+        this(keyspace, table, null);
+    }
+
+    /**
+     * Creates a new {@link InsertStatementBuilder} instance.
+     *
+     * @param mapper2TableName the mapper that specify the table in which 
insert
+     * @param keyspace         the name of the keyspace to which the table 
belongs
+     */
+    public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName, String 
keyspace) {
+        this(keyspace, null, mapper2TableName);
+    }
+    /**
+     * Creates a new {@link InsertStatementBuilder} instance.
+     *
+     * @param mapper2TableName the mapper that specify the table in which 
insert
+     */
+    public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName) {
+        this(null, null, mapper2TableName);
+    }
+
+    private InsertStatementBuilder(String keyspace, String table, 
CQLTableTupleMapper mapper2TableName) {
+        this.keyspace = keyspace;
+        this.table = table;
+        this.mapper2TableName = mapper2TableName;
+    }
+
+    /**
+     * Adds "IF NOT EXISTS" clause to the statement.
+     * @see com.datastax.driver.core.querybuilder.Insert#ifNotExists()
+     */
+    public InsertStatementBuilder ifNotExists() {
+        this.ifNotExist = true;
+        return this;
+    }
+
+    public InsertStatementBuilder usingTTL(Long time, TimeUnit unit) {
+        this.ttlInSeconds = (int) unit.toSeconds(time);
+        return this;
+    }
+
+    /**
+     * Sets the consistency level used for this statement.
+     *
+     * @param level a ConsistencyLevel.
+     */
+    public InsertStatementBuilder withConsistencyLevel(ConsistencyLevel level) 
{
+        this.level = level;
+        return this;
+    }
+
+    /**
+     * Maps tuple to values.
+     *
+     * @see com.datastax.driver.core.querybuilder.Insert#value(String, Object)
+     */
+    public InsertStatementBuilder values(final T valuesMapper) {
+        this.valuesMapper = valuesMapper;
+        return this;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SimpleCQLStatementTupleMapper build() {
+        Preconditions.checkState(null != table, "Table name must not be null");
+        return new SimpleCQLStatementTupleMapper() {
+            @Override
+            public Statement map(ITuple tuple) {
+                Insert stmt = QueryBuilder.insertInto(keyspace, table);
+                for(Map.Entry<String, Object> entry : 
valuesMapper.map(tuple).entrySet())
+                    stmt.value(entry.getKey(), entry.getValue());
+                if (ttlInSeconds != null) 
stmt.using(QueryBuilder.ttl(ttlInSeconds));
+                if (ifNotExist) stmt.ifNotExists();
+                if (level != null) stmt.setConsistencyLevel(level);
+                return stmt;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
new file mode 100644
index 0000000..70696ab
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Update;
+import org.apache.storm.cassandra.query.CQLClauseTupleMapper;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
+import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+
+import java.util.Map;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
+
+public final class UpdateStatementBuilder implements CQLStatementBuilder {
+
+    /**
+     * The name of table to update. 
+     */
+    private final String table;
+    /**
+     * The keyspace of the table.
+     */
+    private final String keyspace;
+
+    private CQLValuesTupleMapper valuesMapper;
+
+    private CQLClauseTupleMapper clausesMapper;
+
+    private CQLClauseTupleMapper onlyIfClausesMapper;
+    /**
+     * Creates a new {@link UpdateStatementBuilder} instance.
+     * @param table the name of table to update.
+     */
+    public UpdateStatementBuilder(String table) {
+        this(table, null);
+    }
+
+    /**
+     * Creates a new {@link UpdateStatementBuilder} instance.
+     * @param table the name of table to update.
+     * @param keyspace the keyspace of the table.
+     */
+    public UpdateStatementBuilder(String table, String keyspace) {
+        this.table = table;
+        this.keyspace = keyspace;
+    }
+
+    /**
+     * Maps output tuple to values.
+     * @see 
com.datastax.driver.core.querybuilder.Update#with(com.datastax.driver.core.querybuilder.Assignment)
+     */
+    public UpdateStatementBuilder with(final CQLValuesTupleMapper values) {
+        this.valuesMapper = values;
+        return this;
+    }
+
+    /**
+     * Maps output tuple to some Where clauses.
+     * @see 
com.datastax.driver.core.querybuilder.Update#where(com.datastax.driver.core.querybuilder.Clause)
+     */
+    public UpdateStatementBuilder where(final CQLClauseTupleMapper clauses) {
+        this.clausesMapper = clauses;
+        return this;
+    }
+
+    /**
+     * Maps output tuple to some If clauses.
+     * @see 
com.datastax.driver.core.querybuilder.Update#onlyIf(com.datastax.driver.core.querybuilder.Clause)
+     */
+    public UpdateStatementBuilder onlyIf(final CQLClauseTupleMapper clauses) {
+        this.onlyIfClausesMapper = clauses;
+        return this;
+    }
+
+    @Override
+    public SimpleCQLStatementTupleMapper build() {
+        return new SimpleCQLStatementTupleMapper() {
+            @Override
+            public Statement map(ITuple tuple) {
+                Update stmt = QueryBuilder.update(keyspace, table);
+                for(Map.Entry<String, Object> entry : 
valuesMapper.map(tuple).entrySet())
+                    stmt.with(set(entry.getKey(), entry.getValue()));
+                for(Clause clause : clausesMapper.map(tuple))
+                    stmt.where(clause);
+                if( hasIfClauses())
+                    for(Clause clause : onlyIfClausesMapper.map(tuple)) {
+                        stmt.onlyIf(clause);
+                    }
+                return stmt;
+            }
+        };
+    }
+
+    private boolean hasIfClauses() {
+        return onlyIfClausesMapper != null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
new file mode 100644
index 0000000..b68138a
--- /dev/null
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.selector;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.utils.UUIDs;
+
+import java.io.Serializable;
+import java.util.Map;
+
+
+public class FieldSelector implements Serializable {
+
+    private String as;
+
+    private String field;
+
+    private boolean isNow;
+
+    /**
+     * Creates a new {@link FieldSelector} instance.
+     * @param field the name of value.
+     */
+    public FieldSelector(String field) {
+        this.field = field;
+    }
+
+    public void selectAndPut(ITuple t, Map<String, Object> values) {
+        values.put(as != null ? as : field, isNow ? UUIDs.timeBased() : 
t.getValueByField(field));
+
+    }
+
+    /**
+     * Sets the fields as an automatically generated TimeUUID.
+     * @return this.
+     */
+    public FieldSelector now() {
+        this.isNow = true;
+        return this;
+    }
+
+    /**
+     * Sets an alias for this field.
+     *
+     * @param as the alias name.
+     * @return this.
+     */
+    public FieldSelector as(String as) {
+        this.as = as;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
new file mode 100644
index 0000000..22e12da
--- /dev/null
+++ 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Statement;
+import com.google.common.collect.Maps;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.cassandraunit.CassandraCQLUnit;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.mockito.Mockito.when;
+
+public class DynamicStatementBuilderTest {
+
+    private static final Date NOW = new Date();
+
+    private static final Tuple mockTuple;
+
+    @Rule
+    public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new 
ClassPathCQLDataSet("schema.cql","weather"));
+
+    static {
+        mockTuple = Mockito.mock(Tuple.class);
+        when(mockTuple.getValueByField("weatherstation_id")).thenReturn("1");
+        when(mockTuple.getValueByField("event_time")).thenReturn(NOW);
+        when(mockTuple.getValueByField("temperature")).thenReturn("0°C");
+        when(mockTuple.getFields()).thenReturn(new Fields("weatherstation_id", 
"event_time", "temperature"));
+    }
+
+    @Test
+    public void 
shouldBuildMultipleStaticInsertStatementGivenKeyspaceAndAllMapper() {
+        executeStatementAndAssert(
+                async(
+                        insertInto("weather", "temperature").values(all()),
+                        insertInto("weather", "temperature").values(all())
+                ),
+                "INSERT INTO 
weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + 
NOW.getTime() + ",'0°C');",
+                "INSERT INTO 
weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + 
NOW.getTime() + ",'0°C');"
+        );
+    }
+
+    @Test
+    public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
+        executeStatementAndAssert(insertInto("weather", 
"temperature").values(all()).build(),
+                "INSERT INTO 
weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + 
NOW.getTime() + ",'0°C');");
+    }
+
+    @Test
+    public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
+        
executeStatementAndAssert(insertInto("temperature").values(all()).build(),
+                "INSERT INTO 
temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + 
NOW.getTime() + ",'0°C');");
+    }
+
+    @Test
+    public void 
shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
+        
executeStatementAndAssert(insertInto("temperature").values(with(fields("weatherstation_id",
 "event_time", "temperature"))).build(),
+                "INSERT INTO 
temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + 
NOW.getTime() + ",'0°C');");
+    }
+
+    @Test
+    public void 
shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
+        executeBatchStatementAndAssert(loggedBatch(
+                
insertInto("temperature").values(with(fields("weatherstation_id", "event_time", 
"temperature")))
+        ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) 
VALUES ('1'," + NOW.getTime() + ",'0°C');");
+    }
+
+    @Test
+    public void 
shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
+        executeBatchStatementAndAssert(unLoggedBatch(
+                
insertInto("temperature").values(with(fields("weatherstation_id", "event_time", 
"temperature")))
+        ),  "INSERT INTO temperature(weatherstation_id,event_time,temperature) 
VALUES ('1'," + NOW.getTime() + ",'0°C');");
+    }
+
+    private void executeBatchStatementAndAssert(CQLStatementTupleMapper 
mapper, String... results) {
+        List<Statement> map = mapper.map(Maps.newHashMap(), 
cassandraCQLUnit.session, mockTuple);
+
+        BatchStatement statement = (BatchStatement)map.get(0);
+        Collection<Statement> statements = statement.getStatements();
+        Assert.assertEquals(results.length, statements.size());
+
+        for(Statement s : statements)
+            Assert.assertTrue(Arrays.asList(results).contains(s.toString()));
+    }
+
+
+    private void executeStatementAndAssert(CQLStatementTupleMapper mapper, 
String... expected) {
+        List<Statement> map = mapper.map(Maps.newHashMap(), 
cassandraCQLUnit.session, mockTuple);
+
+        List<String> listExpected = Arrays.asList(expected);
+        for( int i=0; i< map.size(); i++) {
+            Assert.assertEquals(listExpected.get(i), map.get(i).toString());
+        }
+    }
+
+    @Test
+    public void shouldBuildStaticBoundStatement() {
+        CQLStatementTupleMapper mapper = boundQuery("INSERT INTO 
weather.temperature(weatherstation_id, event_time, temperature) VALUES(?, ?, 
?)")
+                .bind(with(field("weatherstation_id"), 
field("event_time").now(), field("temperature")));
+        List<Statement> map = mapper.map(Maps.newHashMap(), 
cassandraCQLUnit.session, mockTuple);
+        Statement statement = map.get(0);
+        Assert.assertNotNull(statement);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
new file mode 100644
index 0000000..49a1f80
--- /dev/null
+++ 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.junit.Assert;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class WeatherSpout extends BaseRichSpout {
+
+    private SpoutOutputCollector spoutOutputCollector;
+
+    private String stationID;
+
+    private AtomicLong maxQueries;
+
+    private AtomicLong acks = new AtomicLong(0);
+
+    private AtomicLong emit = new AtomicLong(0);
+
+    /**
+     * Creates a new {@link WeatherSpout} instance.
+     * @param stationID The station ID.
+     */
+    public WeatherSpout(String stationID, int maxQueries) {
+        this.stationID = stationID;
+        this.maxQueries = new AtomicLong(maxQueries);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 
{
+        outputFieldsDeclarer.declare(new Fields("weatherstation_id", 
"temperature"));
+    }
+
+    @Override
+    public void open(Map map, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
+        this.spoutOutputCollector = spoutOutputCollector;
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        acks.incrementAndGet();
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        Assert.fail("Must never get fail tuple : " + msgId);
+    }
+
+    @Override
+    public void close() {
+        Assert.assertEquals(acks.get(), emit.get());
+    }
+
+    @Override
+    public void nextTuple() {
+        if(  emit.get() < maxQueries.get() ) {
+            spoutOutputCollector.emit(new Values(stationID, "38°C"), 
emit.incrementAndGet());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
new file mode 100644
index 0000000..49159bc
--- /dev/null
+++ 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.cassandraunit.CassandraCQLUnit;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import backtype.storm.LocalCluster;
+
+import org.junit.Rule;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public abstract class BaseTopologyTest {
+
+    @Rule
+    public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new 
ClassPathCQLDataSet("schema.cql","weather"));
+
+    protected void runLocalTopologyAndWait(TopologyBuilder builder) {
+        LocalCluster cluster = new LocalCluster();
+        StormTopology topology = builder.createTopology();
+        Config config = getConfig();
+        cluster.submitTopology("my-cassandra-topology", config, topology);
+
+        Utils.sleep(TimeUnit.SECONDS.toMillis(15));
+
+        cluster.killTopology("my-cassandra-topology");
+        cluster.shutdown();
+    }
+
+    protected Config getConfig() {
+        Config config = new Config();
+        config.put("cassandra.keyspace", "weather");
+        config.put("cassandra.nodes", "localhost");
+        config.put("cassandra.port", "9142");
+        return config;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
new file mode 100644
index 0000000..8d80ee1
--- /dev/null
+++ 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.topology.TopologyBuilder;
+import com.datastax.driver.core.ResultSet;
+import org.apache.storm.cassandra.WeatherSpout;
+import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+
+
+/**
+ *
+ */
+public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
+
+    public static final String SPOUT_MOCK  = "spout-mock";
+    public static final String BOLT_WRITER = "writer";
+
+    @Test
+    public void 
shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
+        executeAndAssertWith(100000, new 
BatchCassandraWriterBolt(getInsertInto()));
+    }
+
+    private SimpleCQLStatementTupleMapper getInsertInto() {
+        return insertInto("weather", 
"temperature").values(with(field("weatherstation_id"), 
field("event_time").now(), field("temperature"))).build();
+    }
+
+    protected void executeAndAssertWith(final int maxQueries, final 
BaseCassandraBolt bolt) {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1)
+                .setMaxTaskParallelism(1);
+
+        builder.setBolt(BOLT_WRITER, bolt, 4)
+                .shuffleGrouping(SPOUT_MOCK);
+
+        runLocalTopologyAndWait(builder);
+
+        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM 
weather.temperature WHERE weatherstation_id='test'");
+        Assert.assertEquals(maxQueries, rows.all().size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
new file mode 100644
index 0000000..e1a9e9f
--- /dev/null
+++ 
b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.topology.TopologyBuilder;
+import com.datastax.driver.core.ResultSet;
+import org.apache.storm.cassandra.WeatherSpout;
+import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+
+/**
+ *
+ */
+public class CassandraWriterBoltTest extends BaseTopologyTest {
+
+    public static final String SPOUT_MOCK  = "spout-mock";
+    public static final String BOLT_WRITER = "writer";
+
+    @Test
+    public void 
shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
+        executeAndAssertWith(100000, new 
CassandraWriterBolt((getInsertInto())));
+    }
+
+    private SimpleCQLStatementTupleMapper getInsertInto() {
+        return insertInto("weather", 
"temperature").values(with(field("weatherstation_id"), 
field("event_time").now(), field("temperature"))).build();
+    }
+
+    protected void executeAndAssertWith(final int maxQueries, final 
BaseCassandraBolt bolt) {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1)
+                .setMaxTaskParallelism(1);
+
+        builder.setBolt(BOLT_WRITER, bolt, 4)
+                .shuffleGrouping(SPOUT_MOCK);
+
+        runLocalTopologyAndWait(builder);
+
+        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM 
weather.temperature WHERE weatherstation_id='test'");
+        Assert.assertEquals(maxQueries, rows.all().size());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/resources/schema.cql
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/resources/schema.cql 
b/external/storm-cassandra/src/test/resources/schema.cql
new file mode 100644
index 0000000..061cd5a
--- /dev/null
+++ b/external/storm-cassandra/src/test/resources/schema.cql
@@ -0,0 +1,7 @@
+CREATE TABLE temperature (
+    weatherstation_id   TEXT,
+    event_time          TIMEUUID,
+    temperature         TEXT,
+    PRIMARY KEY(weatherstation_id, event_time)
+)
+

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01a12fa..f64b81c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,6 +234,7 @@
         <module>external/storm-elasticsearch</module>
         <module>external/storm-solr</module>
         <module>external/storm-metrics</module>
+        <module>external/storm-cassandra</module>
         <module>examples/storm-starter</module>
     </modules>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml 
b/storm-dist/binary/src/main/assembly/binary.xml
index 4cd6911..ef72592 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -245,7 +245,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
-
+        <fileSet>
+            
<directory>${project.basedir}/../../external/storm-cassandra/target</directory>
+            <outputDirectory>external/storm-cassandra</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            
<directory>${project.basedir}/../../external/storm-cassandra</directory>
+            <outputDirectory>external/storm-cassandra</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
         <!-- $STORM_HOME/extlib -->
         <fileSet>

Reply via email to