Repository: storm Updated Branches: refs/heads/master 0acc1cee6 -> 224c846ec
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java new file mode 100644 index 0000000..df00530 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.context; + +import java.io.Serializable; +import java.util.Map; + +/** + * Simple interface used for providing services based on the storm configuration. + */ +public interface BeanFactory<T> extends Serializable { + + /** + * Sets the storm context. + * @param context + */ + public void setStormContext(WorkerCtx context); + + /** + * Return an instance, which may be shared or independent, of the specified type. + * @param stormConf The storm configuration + * @return + */ + T get(Map<String, Object> stormConf); + + + /** + * Returns a new copy if this factory. + * @return a new {@link BeanFactory} instance. + */ + public BeanFactory<T> newInstance(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java new file mode 100644 index 0000000..384d84c --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.context; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Simple class used to register singletons within a storm worker. + */ +public class WorkerCtx implements Serializable { + + private static final ConcurrentMap<Class, BeanFactory<?>> workerCtx = new ConcurrentHashMap<>(); + + private Map<Class, BeanFactory<?>> componentCtx = new HashMap<>(); + + /** + * Creates a new {@link WorkerCtx} instance. + */ + public WorkerCtx() { + super(); + } + + /** + * Register a bean provider for a specified type. + */ + public <T> void register(Class<T> clazz, BeanFactory<T> provider) { + componentCtx.put(clazz, provider); + } + + /** + * Return an instance provider of the specified type. + * @throws RuntimeException if no bean provider can be resolve for the given class. + * @return + */ + protected <T> BeanFactory<T> getBeanfactory(Class<T> clazz) { + BeanFactory<T> factory = (BeanFactory<T>) this.componentCtx.get(clazz); + if( factory == null) throw new RuntimeException("Cannot resolve bean factory for class : " + clazz.getCanonicalName()); + factory.setStormContext(this); + return factory; + } + + /** + * Return an instance, which is shared (within a Worker), of the specified type. + * @return + */ + public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> stormConf) { + return getWorkerBean(clazz, stormConf,false); + } + + /** + * Return an instance, which is shared (within a Worker), of the specified type. + * + * @param clazz the class of the bean. + * @param stormConf the storm configuration + * @param force if <code>true</code>= create a new instance even if one already exist. + * + * @return a instance of type {@link T}. + */ + public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> stormConf, boolean force) { + if( force ) workerCtx.remove(clazz); + BeanFactory<T> factory = (BeanFactory<T>) this.workerCtx.get(clazz); + if( factory == null) { + BeanFactory<T> instance = getBeanfactory(clazz).newInstance(); + workerCtx.putIfAbsent(clazz, instance); + factory = (BeanFactory<T>) this.workerCtx.get(clazz); + } + return factory.get((Map<String, Object>)stormConf); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java new file mode 100644 index 0000000..311ed11 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Service to asynchronously executes cassandra statements. + */ +public class AsyncExecutor<T> implements Serializable { + + private final static Logger LOG = LoggerFactory.getLogger(AsyncExecutor.class); + + protected Session session; + + protected ExecutorService executorService; + + protected AsyncResultHandler<T> handler; + + private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( ); + + /** + * Creates a new {@link AsyncExecutor} instance. + */ + protected AsyncExecutor(Session session, AsyncResultHandler<T> handler) { + this(session, newSingleThreadExecutor(), handler); + } + + /** + * Creates a new {@link AsyncExecutor} instance. + * + * @param session The cassandra session. + * @param executorService The executor service responsible to execute handler. + */ + private AsyncExecutor(Session session, ExecutorService executorService, AsyncResultHandler<T> handler) { + this.session = session; + this.executorService = executorService; + this.handler = handler; + } + + protected static ExecutorService newSingleThreadExecutor() { + return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build()); + } + + /** + * Asynchronously executes all statements associated to the specified input. The input will be passed to + * the {@link #handler} once all queries succeed or failed. + */ + public List<SettableFuture<T>> execAsync(List<Statement> statements, final T input) { + + List<SettableFuture<T>> settableFutures = new ArrayList<>(statements.size()); + + for(Statement s : statements) + settableFutures.add(execAsync(s, input, AsyncResultHandler.NO_OP_HANDLER)); + + ListenableFuture<List<T>> allAsList = Futures.allAsList(settableFutures); + Futures.addCallback(allAsList, new FutureCallback<List<T>>(){ + @Override + public void onSuccess(List<T> inputs) { + handler.success(input); + } + + @Override + public void onFailure(Throwable t) { + handler.failure(t, input); + } + }, executorService); + return settableFutures; + } + + /** + * Asynchronously executes the specified batch statement. Inputs will be passed to + * the {@link #handler} once query succeed or failed. + */ + public SettableFuture<T> execAsync(final Statement statement, final T inputs) { + return execAsync(statement, inputs, handler); + } + /** + * Asynchronously executes the specified batch statement. Inputs will be passed to + * the {@link #handler} once query succeed or failed. + */ + public SettableFuture<T> execAsync(final Statement statement, final T inputs, final AsyncResultHandler<T> handler) { + final SettableFuture<T> settableFuture = SettableFuture.create(); + pending.put(settableFuture, true); + ResultSetFuture future = session.executeAsync(statement); + Futures.addCallback(future, new FutureCallback<ResultSet>() { + public void release() { + pending.remove(settableFuture); + } + + @Override + public void onSuccess(ResultSet result) { + release(); + settableFuture.set(inputs); + handler.success(inputs); + } + + @Override + public void onFailure(Throwable t) { + LOG.error(String.format("Failed to execute statement '%s' ", statement), t); + release(); + settableFuture.setException(t); + handler.failure(t, inputs); + } + }, executorService); + return settableFuture; + } + + /** + * Returns the number of currently executed tasks which are not yet completed. + */ + public int getPendingExec( ) { + return this.pending.size(); + } + + public void shutdown( ) { + if( ! executorService.isShutdown() ) { + LOG.info("shutting down async handler executor"); + this.executorService.shutdownNow(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java new file mode 100644 index 0000000..0c684c0 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor; + +import com.datastax.driver.core.Session; + +/** + * This class must be used to obtain a single instance of {@link AsyncExecutor} per storm executor. + */ +public class AsyncExecutorProvider { + + private static final ThreadLocal<AsyncExecutor> localAsyncExecutor = new ThreadLocal<>(); + + /** + * Returns a new {@link AsyncExecutor} per storm executor. + */ + public static <T> AsyncExecutor getLocal(Session session, AsyncResultHandler<T> handler) { + AsyncExecutor<T> executor = localAsyncExecutor.get(); + if( executor == null ) { + localAsyncExecutor.set(executor = new AsyncExecutor<>(session, handler)); + } + return executor; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java new file mode 100644 index 0000000..9b51696 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor; + +import backtype.storm.task.OutputCollector; + +import java.io.Serializable; + +/** + * Default handler for batch asynchronous execution. + */ +public interface AsyncResultHandler<T> extends Serializable { + + public static final AsyncResultHandler NO_OP_HANDLER = new AsyncResultHandler() { + @Override + public void failure(Throwable t, Object inputs) { + /** no-operation **/ + } + + @Override + public void success(Object inputs) { + /** no-operation **/ + } + + @Override + public void flush(OutputCollector collector) { + throw new UnsupportedOperationException(); + } + }; + + /** + * This method is responsible for failing specified inputs. + * + * @param t The cause the failure. + * @param inputs The input tuple proceed. + */ + void failure(Throwable t, T inputs); + + /** + * This method is responsible for acknowledging specified inputs. + * + * @param inputs The input tuple proceed. + */ + void success(T inputs) ; + + void flush(OutputCollector collector); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java new file mode 100644 index 0000000..d0f5e1d --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import com.google.common.collect.Lists; +import org.apache.storm.cassandra.ExecutionResultHandler; + +import java.util.List; + +/** + * This class is responsible to collect input tuples proceed. + */ +public interface ExecutionResultCollector { + + void handle(OutputCollector collector, ExecutionResultHandler handler); + + public static final class SucceedCollector implements ExecutionResultCollector { + + private final List<Tuple> inputs; + + /** + * Creates a new {@link ExecutionResultCollector} instance. + * @param input the input tuple. + */ + public SucceedCollector(Tuple input) { + this(Lists.newArrayList(input)); + } + + /** + * Creates a new {@link ExecutionResultCollector} instance. + * @param inputs the input tuple. + */ + public SucceedCollector(List<Tuple> inputs) { + this.inputs = inputs; + } + + /** + * Calls {@link ExecutionResultHandler#onQuerySuccess(backtype.storm.task.OutputCollector, backtype.storm.tuple.Tuple)} before + * acknowledging an single input tuple. + */ + @Override + public void handle(OutputCollector collector, ExecutionResultHandler handler) { + for(Tuple t : inputs) handler.onQuerySuccess(collector, t); + for(Tuple t : inputs) collector.ack(t); + } + } + + public static final class FailedCollector implements ExecutionResultCollector { + + private final Throwable cause; + private final List<Tuple> inputs; + + /** + * Creates a new {@link FailedCollector} instance. + * @param input the input tuple. + * @param cause the cause of the error. + */ + public FailedCollector(Tuple input, Throwable cause) { + this(Lists.newArrayList(input), cause); + } + + /** + * Creates a new {@link FailedCollector} instance. + * @param inputs the input tuple. + * @param cause the cause of the error. + */ + public FailedCollector(List<Tuple> inputs, Throwable cause) { + this.inputs = inputs; + this.cause = cause; + } + + /** + * Calls {@link ExecutionResultHandler#onThrowable(Throwable, backtype.storm.task.OutputCollector, backtype.storm.tuple.Tuple)} . + */ + @Override + public void handle(OutputCollector collector, ExecutionResultHandler handler) { + handler.onThrowable(cause, collector, inputs); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java new file mode 100644 index 0000000..c81da8c --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor.impl; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import org.apache.storm.cassandra.ExecutionResultHandler; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.ExecutionResultCollector; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + + +public class BatchAsyncResultHandler implements AsyncResultHandler<List<Tuple>> { + + private ConcurrentLinkedQueue<ExecutionResultCollector> completed; + + private ExecutionResultHandler handler; + + /** + * Creates a new {@link BatchAsyncResultHandler} instance. + * @param handler + */ + public BatchAsyncResultHandler(ExecutionResultHandler handler) { + this.handler = handler; + this.completed = new ConcurrentLinkedQueue<>(); + } + + /** + * This method is responsible for failing specified inputs. + * + * The default method does no-operation. + */ + public void failure(Throwable t, List<Tuple> input) { + completed.offer(new ExecutionResultCollector.FailedCollector(input, t)); + } + /** + * This method is responsible for acknowledging specified inputs. + * + * The default method does no-operation. + */ + public void success(List<Tuple> input) { + completed.offer(new ExecutionResultCollector.SucceedCollector(input)); + } + + /** + * {@inheritDoc} + */ + @Override + public void flush(final OutputCollector collector) { + ExecutionResultCollector poll; + while( (poll = completed.poll()) != null ) { + poll.handle(collector, handler); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java new file mode 100644 index 0000000..62a5a3b --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor.impl; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import org.apache.storm.cassandra.ExecutionResultHandler; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.ExecutionResultCollector; + +import java.util.concurrent.ConcurrentLinkedQueue; + + +public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> { + + private ConcurrentLinkedQueue<ExecutionResultCollector> completed; + + private ExecutionResultHandler handler; + + /** + * Creates a new {@link SingleAsyncResultHandler} instance. + * @param handler + */ + public SingleAsyncResultHandler(ExecutionResultHandler handler) { + this.handler = handler; + this.completed = new ConcurrentLinkedQueue<>(); + } + + /** + * This method is responsible for failing specified inputs. + * + * The default method does no-operation. + */ + public void failure(Throwable t, Tuple input) { + completed.offer(new ExecutionResultCollector.FailedCollector(input, t)); + } + /** + * This method is responsible for acknowledging specified inputs. + * + * The default method does no-operation. + */ + public void success(Tuple input) { + completed.offer(new ExecutionResultCollector.SucceedCollector(input)); + } + + /** + * {@inheritDoc} + */ + @Override + public void flush(final OutputCollector collector) { + ExecutionResultCollector poll; + while( (poll = completed.poll()) != null ) { + poll.handle(collector, handler); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java new file mode 100644 index 0000000..32ff1de --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + + +public class BatchStatementTupleMapper implements CQLStatementTupleMapper { + + private final List<CQLStatementTupleMapper> mappers; + private final BatchStatement.Type type; + + /** + * Creates a new {@link BatchStatementTupleMapper} instance. + * @param type + * @param mappers + */ + public BatchStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) { + this.mappers = new ArrayList<>(mappers); + this.type = type; + } + + /** + * {@inheritDoc} + */ + @Override + public List<Statement> map(Map conf, Session session, ITuple tuple) { + final BatchStatement batch = new BatchStatement(this.type); + for(CQLStatementTupleMapper m : mappers) + batch.addAll(m.map(conf, session, tuple)); + return Arrays.asList((Statement)batch); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java new file mode 100644 index 0000000..baa4685 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.querybuilder.Clause; + +import java.io.Serializable; +import java.util.List; + +/** + * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to + * a list of cassandra {@link com.datastax.driver.core.querybuilder.Clause}s. + * + */ +public interface CQLClauseTupleMapper extends Serializable { + + List<Clause> map(ITuple tuple); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java new file mode 100644 index 0000000..6c04d1c --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import java.io.Serializable; + + +public interface CQLStatementBuilder extends Serializable { + + /** + * Builds a new {@link CQLStatementTupleMapper} instance. + * @return a new CQLStatementMapper instance. + */ + <T extends CQLStatementTupleMapper> T build(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java new file mode 100644 index 0000000..32c86dd --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Insert; +import com.google.common.base.Optional; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; + +/** + * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}. + * + */ +public interface CQLStatementTupleMapper extends Serializable { + + public static final String FIELD_KEYSPACE = "keyspace"; + public static final String FIELD_TABLE = "table"; + public static final String FIELD_VALUES = "value"; + + /** + * Maps a given tuple to one or multiple CQL statements. + * + * @param conf the storm configuration map. + * @param session the cassandra session. + * @param tuple the incoming tuple to map. + * @return a list of {@link com.datastax.driver.core.Statement}. + */ + List<Statement> map(Map conf, Session session, ITuple tuple); + + public static class InsertCQLStatementTupleMapper implements CQLStatementTupleMapper { + @Override + public List<Statement> map(Map conf, Session session, ITuple tuple) { + Optional<String> ks = Optional.fromNullable(tuple.contains(FIELD_KEYSPACE) ? tuple.getStringByField(FIELD_KEYSPACE) : null); + String table = tuple.getStringByField(FIELD_TABLE); + Map<String, Object> values = (Map<String, Object>) tuple.getValueByField(FIELD_VALUES); + + final Insert stmt = (ks.isPresent()) ? insertInto(ks.get(), table) : insertInto(table); + for(Map.Entry<String, Object> v : values.entrySet()) + stmt.value(v.getKey(), v.getValue()); + + return Arrays.asList((Statement)stmt); + } + } + + public static class DynamicCQLStatementTupleMapper implements CQLStatementTupleMapper { + private List<CQLStatementBuilder> builders; + + public DynamicCQLStatementTupleMapper(List<CQLStatementBuilder> builders) { + this.builders = builders; + } + + @Override + public List<Statement> map(Map conf, Session session, ITuple tuple) { + List<Statement> statements = new LinkedList<>(); + for(CQLStatementBuilder b : builders) { + statements.addAll(b.build().map(conf, session, tuple)); + } + return statements; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java new file mode 100644 index 0000000..574eac9 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; + +import java.io.Serializable; + +/** + * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to a table name. + * + */ +public interface CQLTableTupleMapper extends Serializable { + + /** + * Returns a table's name from the specified tuple. + * + * @param tuple the incoming tuple. + * @return the table name. + */ + String map(ITuple tuple); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java new file mode 100644 index 0000000..2cb8e4c --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; +import org.apache.storm.cassandra.query.selector.FieldSelector; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to Map of values of a CQL statement. + * + */ +public interface CQLValuesTupleMapper extends Serializable { + + /** + * Map the specified {@code tuple} to values of CQL statement(s). + * @param tuple the incoming tuple to map. + * @return values of CQL statement(s) + */ + Map<String, Object> map(ITuple tuple); + + /** + * Default {@link CQLValuesTupleMapper} implementation to get specifics tuple's fields. + */ + public static class WithFieldTupleMapper implements CQLValuesTupleMapper { + private List<FieldSelector> fields; + + public WithFieldTupleMapper(List<FieldSelector> fields) { + this.fields = fields; + } + + @Override + public Map<String, Object> map(ITuple tuple) { + Map<String, Object> ret = new LinkedHashMap<>(); + for(FieldSelector fs : fields) + fs.selectAndPut(tuple, ret); + return ret; + } + } + + /** + * Default {@link CQLValuesTupleMapper} implementation to get all tuple's fields. + */ + public static class AllTupleMapper implements CQLValuesTupleMapper { + @Override + public Map<String, Object> map(ITuple tuple) { + Map<String, Object> ret = new LinkedHashMap<>(); + for(String name : tuple.getFields()) + ret.put(name, tuple.getValueByField(name)); + return ret; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java new file mode 100644 index 0000000..4cc4c49 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java @@ -0,0 +1,83 @@ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; + +import java.io.Serializable; +import java.util.Map; + +/** + * This interface may be used to retrieve a cassandra bound query either from storm config + * or the tuple being proceed. + * + */ +public interface ContextQuery extends Serializable { + + /** + * Resolves a cassandra bound query. + * + * @param config the storm configuration + * @param tuple the tuple being proceed. + * + * @return a string bound query. + */ + public String resolves(Map config, ITuple tuple); + + /** + * Static implementation of {@link ContextQuery} interface. + */ + public static final class StaticContextQuery implements ContextQuery { + private final String value; + + /** + * Creates a new {@link StaticContextQuery} instance. + * @param value + */ + public StaticContextQuery(String value) { + this.value = value; + } + + @Override + public String resolves(Map config, ITuple tuple) { + return value; + } + } + + /** + * Default {@link BoundQueryContext} implementation to retrieve a bound query + * identified by the provided key. + */ + public static final class BoundQueryContext implements ContextQuery { + private String key; + + public BoundQueryContext(String key) { + this.key = key; + } + + @Override + public String resolves(Map config, ITuple tuple) { + if (config.containsKey(key)) return (String) config.get(key); + + throw new IllegalArgumentException("Bound query '" + key + "' does not exist in configuration"); + } + } + + /** + * Default {@link BoundQueryNamedByFieldContext} implementation to retrieve a bound query named by + * the value of a specified tuple field. + */ + public static final class BoundQueryNamedByFieldContext implements ContextQuery { + + private String fieldName; + + public BoundQueryNamedByFieldContext(String fieldName) { + this.fieldName = fieldName; + } + + @Override + public String resolves(Map config, ITuple tuple) { + String name = tuple.getStringByField(fieldName); + if (config.containsKey(name)) return (String) config.get(name); + throw new IllegalArgumentException("Bound query '" + name + "' does not exist in configuration"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java new file mode 100644 index 0000000..683824e --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}. + * + */ +public abstract class SimpleCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable { + + /** + * {@inheritDoc} + */ + @Override + public List<Statement> map(Map conf, Session session, ITuple tuple) { + return Arrays.asList(map(tuple)); + } + + /** + * Maps a given tuple to a single CQL statements. + * + * @param tuple the incoming tuple to map. + * @return a list of {@link com.datastax.driver.core.Statement}. + */ + public abstract Statement map(ITuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java new file mode 100644 index 0000000..4348253 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query.impl; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.apache.storm.cassandra.query.CQLValuesTupleMapper; +import org.apache.storm.cassandra.query.ContextQuery; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.storm.cassandra.query.ContextQuery.*; + + +public class BoundStatementMapperBuilder implements Serializable { + private final ContextQuery contextQuery; + + /** + * Creates a new {@link BoundStatementMapperBuilder} instance. + * @param cql + */ + public BoundStatementMapperBuilder(String cql) { + this.contextQuery = new StaticContextQuery(cql); + } + + /** + * Creates a new {@link BoundStatementMapperBuilder} instance. + * @param contextQuery + */ + public BoundStatementMapperBuilder(ContextQuery contextQuery) { + this.contextQuery = contextQuery; + } + + public CQLStatementTupleMapper bind(final CQLValuesTupleMapper mapper) { + return new CQLBoundStatementTupleMapper(contextQuery, mapper); + } + + public static class CQLBoundStatementTupleMapper implements CQLStatementTupleMapper { + + private final ContextQuery contextQuery; + + private final CQLValuesTupleMapper mapper; + + private Map<String, PreparedStatement> cache = new HashMap<>(); + + /** + * Creates a new {@link CQLBoundStatementTupleMapper} instance. + * + * @param contextQuery + * @param mapper + */ + CQLBoundStatementTupleMapper(ContextQuery contextQuery, CQLValuesTupleMapper mapper) { + this.contextQuery = contextQuery; + this.mapper = mapper; + } + + /** + * {@inheritDoc} + */ + @Override + public List<Statement> map(Map config, Session session, ITuple tuple) { + final Map<String, Object> values = mapper.map(tuple); + final String query = contextQuery.resolves(config, tuple); + Object[] objects = values.values().toArray(new Object[values.size()]); + PreparedStatement statement = getPreparedStatement(session, query); + return Arrays.asList((Statement)statement.bind(objects)); + } + + /** + * Get or prepare a statement using the specified session and the query. + * * + * @param session The cassandra session. + * @param query The CQL query to prepare. + */ + private PreparedStatement getPreparedStatement(Session session, String query) { + PreparedStatement statement = cache.get(query); + if( statement == null) { + statement = session.prepare(query); + cache.put(query, statement); + } + return statement; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java new file mode 100644 index 0000000..8eddb04 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query.impl; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.base.Preconditions; +import org.apache.storm.cassandra.query.CQLStatementBuilder; +import org.apache.storm.cassandra.query.CQLTableTupleMapper; +import org.apache.storm.cassandra.query.CQLValuesTupleMapper; +import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class InsertStatementBuilder<T extends CQLValuesTupleMapper> implements CQLStatementBuilder { + /** + * Optional name of the table into insert. + */ + protected final String table; + /** + * Optional keyspace name to insert. + */ + protected final String keyspace; + /** + * Optional mapper to retrieve a table name from tuple. + */ + protected final CQLTableTupleMapper mapper2TableName; + + protected T valuesMapper; + + protected boolean ifNotExist; + + protected Integer ttlInSeconds; + + protected ConsistencyLevel level; + + /** + * Creates a new {@link InsertStatementBuilder} instance. + * + * @param table name of the table into insert. + */ + public InsertStatementBuilder(String table) { + this(null, table, null); + } + + /** + * Creates a new {@link InsertStatementBuilder} instance. + * + * @param table name of the table into insert. + * @param keyspace the keyspace's name to which the table belongs + */ + public InsertStatementBuilder(String table, String keyspace) { + this(keyspace, table, null); + } + + /** + * Creates a new {@link InsertStatementBuilder} instance. + * + * @param mapper2TableName the mapper that specify the table in which insert + * @param keyspace the name of the keyspace to which the table belongs + */ + public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName, String keyspace) { + this(keyspace, null, mapper2TableName); + } + /** + * Creates a new {@link InsertStatementBuilder} instance. + * + * @param mapper2TableName the mapper that specify the table in which insert + */ + public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName) { + this(null, null, mapper2TableName); + } + + private InsertStatementBuilder(String keyspace, String table, CQLTableTupleMapper mapper2TableName) { + this.keyspace = keyspace; + this.table = table; + this.mapper2TableName = mapper2TableName; + } + + /** + * Adds "IF NOT EXISTS" clause to the statement. + * @see com.datastax.driver.core.querybuilder.Insert#ifNotExists() + */ + public InsertStatementBuilder ifNotExists() { + this.ifNotExist = true; + return this; + } + + public InsertStatementBuilder usingTTL(Long time, TimeUnit unit) { + this.ttlInSeconds = (int) unit.toSeconds(time); + return this; + } + + /** + * Sets the consistency level used for this statement. + * + * @param level a ConsistencyLevel. + */ + public InsertStatementBuilder withConsistencyLevel(ConsistencyLevel level) { + this.level = level; + return this; + } + + /** + * Maps tuple to values. + * + * @see com.datastax.driver.core.querybuilder.Insert#value(String, Object) + */ + public InsertStatementBuilder values(final T valuesMapper) { + this.valuesMapper = valuesMapper; + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public SimpleCQLStatementTupleMapper build() { + Preconditions.checkState(null != table, "Table name must not be null"); + return new SimpleCQLStatementTupleMapper() { + @Override + public Statement map(ITuple tuple) { + Insert stmt = QueryBuilder.insertInto(keyspace, table); + for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet()) + stmt.value(entry.getKey(), entry.getValue()); + if (ttlInSeconds != null) stmt.using(QueryBuilder.ttl(ttlInSeconds)); + if (ifNotExist) stmt.ifNotExists(); + if (level != null) stmt.setConsistencyLevel(level); + return stmt; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java new file mode 100644 index 0000000..70696ab --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query.impl; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Update; +import org.apache.storm.cassandra.query.CQLClauseTupleMapper; +import org.apache.storm.cassandra.query.CQLStatementBuilder; +import org.apache.storm.cassandra.query.CQLValuesTupleMapper; +import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper; + +import java.util.Map; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.set; + +public final class UpdateStatementBuilder implements CQLStatementBuilder { + + /** + * The name of table to update. + */ + private final String table; + /** + * The keyspace of the table. + */ + private final String keyspace; + + private CQLValuesTupleMapper valuesMapper; + + private CQLClauseTupleMapper clausesMapper; + + private CQLClauseTupleMapper onlyIfClausesMapper; + /** + * Creates a new {@link UpdateStatementBuilder} instance. + * @param table the name of table to update. + */ + public UpdateStatementBuilder(String table) { + this(table, null); + } + + /** + * Creates a new {@link UpdateStatementBuilder} instance. + * @param table the name of table to update. + * @param keyspace the keyspace of the table. + */ + public UpdateStatementBuilder(String table, String keyspace) { + this.table = table; + this.keyspace = keyspace; + } + + /** + * Maps output tuple to values. + * @see com.datastax.driver.core.querybuilder.Update#with(com.datastax.driver.core.querybuilder.Assignment) + */ + public UpdateStatementBuilder with(final CQLValuesTupleMapper values) { + this.valuesMapper = values; + return this; + } + + /** + * Maps output tuple to some Where clauses. + * @see com.datastax.driver.core.querybuilder.Update#where(com.datastax.driver.core.querybuilder.Clause) + */ + public UpdateStatementBuilder where(final CQLClauseTupleMapper clauses) { + this.clausesMapper = clauses; + return this; + } + + /** + * Maps output tuple to some If clauses. + * @see com.datastax.driver.core.querybuilder.Update#onlyIf(com.datastax.driver.core.querybuilder.Clause) + */ + public UpdateStatementBuilder onlyIf(final CQLClauseTupleMapper clauses) { + this.onlyIfClausesMapper = clauses; + return this; + } + + @Override + public SimpleCQLStatementTupleMapper build() { + return new SimpleCQLStatementTupleMapper() { + @Override + public Statement map(ITuple tuple) { + Update stmt = QueryBuilder.update(keyspace, table); + for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet()) + stmt.with(set(entry.getKey(), entry.getValue())); + for(Clause clause : clausesMapper.map(tuple)) + stmt.where(clause); + if( hasIfClauses()) + for(Clause clause : onlyIfClausesMapper.map(tuple)) { + stmt.onlyIf(clause); + } + return stmt; + } + }; + } + + private boolean hasIfClauses() { + return onlyIfClausesMapper != null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java new file mode 100644 index 0000000..b68138a --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.query.selector; + +import backtype.storm.tuple.ITuple; +import com.datastax.driver.core.utils.UUIDs; + +import java.io.Serializable; +import java.util.Map; + + +public class FieldSelector implements Serializable { + + private String as; + + private String field; + + private boolean isNow; + + /** + * Creates a new {@link FieldSelector} instance. + * @param field the name of value. + */ + public FieldSelector(String field) { + this.field = field; + } + + public void selectAndPut(ITuple t, Map<String, Object> values) { + values.put(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field)); + + } + + /** + * Sets the fields as an automatically generated TimeUUID. + * @return this. + */ + public FieldSelector now() { + this.isNow = true; + return this; + } + + /** + * Sets an alias for this field. + * + * @param as the alias name. + * @return this. + */ + public FieldSelector as(String as) { + this.as = as; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java new file mode 100644 index 0000000..22e12da --- /dev/null +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Statement; +import com.google.common.collect.Maps; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.cassandraunit.CassandraCQLUnit; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import static org.apache.storm.cassandra.DynamicStatementBuilder.*; +import static org.mockito.Mockito.when; + +public class DynamicStatementBuilderTest { + + private static final Date NOW = new Date(); + + private static final Tuple mockTuple; + + @Rule + public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather")); + + static { + mockTuple = Mockito.mock(Tuple.class); + when(mockTuple.getValueByField("weatherstation_id")).thenReturn("1"); + when(mockTuple.getValueByField("event_time")).thenReturn(NOW); + when(mockTuple.getValueByField("temperature")).thenReturn("0°C"); + when(mockTuple.getFields()).thenReturn(new Fields("weatherstation_id", "event_time", "temperature")); + } + + @Test + public void shouldBuildMultipleStaticInsertStatementGivenKeyspaceAndAllMapper() { + executeStatementAndAssert( + async( + insertInto("weather", "temperature").values(all()), + insertInto("weather", "temperature").values(all()) + ), + "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');", + "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');" + ); + } + + @Test + public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() { + executeStatementAndAssert(insertInto("weather", "temperature").values(all()).build(), + "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"); + } + + @Test + public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() { + executeStatementAndAssert(insertInto("temperature").values(all()).build(), + "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"); + } + + @Test + public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() { + executeStatementAndAssert(insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature"))).build(), + "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"); + } + + @Test + public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() { + executeBatchStatementAndAssert(loggedBatch( + insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature"))) + ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"); + } + + @Test + public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() { + executeBatchStatementAndAssert(unLoggedBatch( + insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature"))) + ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"); + } + + private void executeBatchStatementAndAssert(CQLStatementTupleMapper mapper, String... results) { + List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple); + + BatchStatement statement = (BatchStatement)map.get(0); + Collection<Statement> statements = statement.getStatements(); + Assert.assertEquals(results.length, statements.size()); + + for(Statement s : statements) + Assert.assertTrue(Arrays.asList(results).contains(s.toString())); + } + + + private void executeStatementAndAssert(CQLStatementTupleMapper mapper, String... expected) { + List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple); + + List<String> listExpected = Arrays.asList(expected); + for( int i=0; i< map.size(); i++) { + Assert.assertEquals(listExpected.get(i), map.get(i).toString()); + } + } + + @Test + public void shouldBuildStaticBoundStatement() { + CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weatherstation_id, event_time, temperature) VALUES(?, ?, ?)") + .bind(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))); + List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple); + Statement statement = map.get(0); + Assert.assertNotNull(statement); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java new file mode 100644 index 0000000..49a1f80 --- /dev/null +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import org.junit.Assert; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class WeatherSpout extends BaseRichSpout { + + private SpoutOutputCollector spoutOutputCollector; + + private String stationID; + + private AtomicLong maxQueries; + + private AtomicLong acks = new AtomicLong(0); + + private AtomicLong emit = new AtomicLong(0); + + /** + * Creates a new {@link WeatherSpout} instance. + * @param stationID The station ID. + */ + public WeatherSpout(String stationID, int maxQueries) { + this.stationID = stationID; + this.maxQueries = new AtomicLong(maxQueries); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("weatherstation_id", "temperature")); + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + } + + @Override + public void ack(Object msgId) { + acks.incrementAndGet(); + } + + @Override + public void fail(Object msgId) { + Assert.fail("Must never get fail tuple : " + msgId); + } + + @Override + public void close() { + Assert.assertEquals(acks.get(), emit.get()); + } + + @Override + public void nextTuple() { + if( emit.get() < maxQueries.get() ) { + spoutOutputCollector.emit(new Values(stationID, "38°C"), emit.incrementAndGet()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java new file mode 100644 index 0000000..49159bc --- /dev/null +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.Config; +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; +import org.cassandraunit.CassandraCQLUnit; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import backtype.storm.LocalCluster; + +import org.junit.Rule; + +import java.util.concurrent.TimeUnit; + +/** + * + */ +public abstract class BaseTopologyTest { + + @Rule + public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather")); + + protected void runLocalTopologyAndWait(TopologyBuilder builder) { + LocalCluster cluster = new LocalCluster(); + StormTopology topology = builder.createTopology(); + Config config = getConfig(); + cluster.submitTopology("my-cassandra-topology", config, topology); + + Utils.sleep(TimeUnit.SECONDS.toMillis(15)); + + cluster.killTopology("my-cassandra-topology"); + cluster.shutdown(); + } + + protected Config getConfig() { + Config config = new Config(); + config.put("cassandra.keyspace", "weather"); + config.put("cassandra.nodes", "localhost"); + config.put("cassandra.port", "9142"); + return config; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java new file mode 100644 index 0000000..8d80ee1 --- /dev/null +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.topology.TopologyBuilder; +import com.datastax.driver.core.ResultSet; +import org.apache.storm.cassandra.WeatherSpout; +import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.storm.cassandra.DynamicStatementBuilder.*; + + +/** + * + */ +public class BatchCassandraWriterBoltTest extends BaseTopologyTest { + + public static final String SPOUT_MOCK = "spout-mock"; + public static final String BOLT_WRITER = "writer"; + + @Test + public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() { + executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto())); + } + + private SimpleCQLStatementTupleMapper getInsertInto() { + return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build(); + } + + protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1) + .setMaxTaskParallelism(1); + + builder.setBolt(BOLT_WRITER, bolt, 4) + .shuffleGrouping(SPOUT_MOCK); + + runLocalTopologyAndWait(builder); + + ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'"); + Assert.assertEquals(maxQueries, rows.all().size()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java new file mode 100644 index 0000000..e1a9e9f --- /dev/null +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.topology.TopologyBuilder; +import com.datastax.driver.core.ResultSet; +import org.apache.storm.cassandra.WeatherSpout; +import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.storm.cassandra.DynamicStatementBuilder.*; + +/** + * + */ +public class CassandraWriterBoltTest extends BaseTopologyTest { + + public static final String SPOUT_MOCK = "spout-mock"; + public static final String BOLT_WRITER = "writer"; + + @Test + public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() { + executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto()))); + } + + private SimpleCQLStatementTupleMapper getInsertInto() { + return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build(); + } + + protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1) + .setMaxTaskParallelism(1); + + builder.setBolt(BOLT_WRITER, bolt, 4) + .shuffleGrouping(SPOUT_MOCK); + + runLocalTopologyAndWait(builder); + + ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'"); + Assert.assertEquals(maxQueries, rows.all().size()); + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/test/resources/schema.cql ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/resources/schema.cql b/external/storm-cassandra/src/test/resources/schema.cql new file mode 100644 index 0000000..061cd5a --- /dev/null +++ b/external/storm-cassandra/src/test/resources/schema.cql @@ -0,0 +1,7 @@ +CREATE TABLE temperature ( + weatherstation_id TEXT, + event_time TIMEUUID, + temperature TEXT, + PRIMARY KEY(weatherstation_id, event_time) +) + http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 01a12fa..f64b81c 100644 --- a/pom.xml +++ b/pom.xml @@ -234,6 +234,7 @@ <module>external/storm-elasticsearch</module> <module>external/storm-solr</module> <module>external/storm-metrics</module> + <module>external/storm-cassandra</module> <module>examples/storm-starter</module> </modules> http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/storm-dist/binary/src/main/assembly/binary.xml ---------------------------------------------------------------------- diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index 4cd6911..ef72592 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -245,7 +245,20 @@ <include>README.*</include> </includes> </fileSet> - + <fileSet> + <directory>${project.basedir}/../../external/storm-cassandra/target</directory> + <outputDirectory>external/storm-cassandra</outputDirectory> + <includes> + <include>storm*jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.basedir}/../../external/storm-cassandra</directory> + <outputDirectory>external/storm-cassandra</outputDirectory> + <includes> + <include>README.*</include> + </includes> + </fileSet> <!-- $STORM_HOME/extlib --> <fileSet>
