[GitHub] storm pull request: STORM-1348 - refactor API to remove Insert/Upd...

2015-12-17 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/929#issuecomment-165393591
  
@harshach is this PR can be merge if modifications are OK for you ? Thank 
you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1348 - refactor API to remove Insert/Upd...

2015-12-14 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/929#issuecomment-164410061
  
@satishd done ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1348 - refactor API to remove Insert/Upd...

2015-12-07 Thread fhussonnois
GitHub user fhussonnois opened a pull request:

https://github.com/apache/storm/pull/929

STORM-1348 - refactor API to remove Insert/Update builder in Cassandra 
connector

I need to add some tests but I open this PR to start its review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhussonnois/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/929.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #929


commit 8eca74586ac852b89614f97ce5cd9429af7e7286
Author: Florian Hussonnois 
Date:   2015-12-06T21:23:29Z

STORM-1348 - refactor API to remove Insert/Update builder in Cassandra 
connector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1211 Added trident support for Cassandra...

2015-12-03 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/915#discussion_r46620267
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
 ---
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraState implements State {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraState.class);
+
+private final Map conf;
+private final Options options;
+
+private Session session;
+private SimpleClient client;
+
+public CassandraState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions, Options options) {
+this.conf = conf;
+this.options = options;
+}
+
+public static final class Options implements Serializable {
+private final SimpleClientProvider clientProvider;
+private CQLStatementTupleMapper cqlStatementTupleMapper;
+private CQLResultSetValuesMapper cqlResultSetValuesMapper;
+private BatchStatement.Type batchingType = 
BatchStatement.Type.LOGGED;
+
+
+public Options(SimpleClientProvider clientProvider) {
+this.clientProvider = clientProvider;
+}
+
+public Options withCQLStatementTupleMapper(CQLStatementTupleMapper 
cqlStatementTupleMapper) {
+this.cqlStatementTupleMapper = cqlStatementTupleMapper;
+return this;
+}
+
+public Options 
withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) 
{
+this.cqlResultSetValuesMapper = cqlResultSetValuesMapper;
+return this;
+}
+
+public Options withBatching(BatchStatement.Type batchingType) {
+this.batchingType = batchingType;
+return this;
+}
+
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is noop");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is noop");
+}
+
+public void prepare() {
+Preconditions.checkNotNull(options.cqlStatementTupleMapper, 
"CassandraState.Options should have cqlStatementTupleMapper");
+
+client = options.clientProvider.getClient(conf);
+session = client.connect();
+}
+
+public void cleanup() {
+session.close();
+client.close();
+}
+
+public void updateState(List tuples, final 
TridentCollector collector) {
--- End diff --

Ok I understand your point but this may lead to some issues at scale. 
Usually writes are idempotent in cassandra. So even if a write fail you could 
resend all tuples. Could we imagine two strategies to update the state ?


---
If your project is set up for it, you can

[GitHub] storm pull request: STORM-1211 Added trident support for Cassandra...

2015-12-03 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/915#discussion_r46584477
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
 ---
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraState implements State {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraState.class);
+
+private final Map conf;
+private final Options options;
+
+private Session session;
+private SimpleClient client;
+
+public CassandraState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions, Options options) {
+this.conf = conf;
+this.options = options;
+}
+
+public static final class Options implements Serializable {
+private final SimpleClientProvider clientProvider;
+private CQLStatementTupleMapper cqlStatementTupleMapper;
+private CQLResultSetValuesMapper cqlResultSetValuesMapper;
+private BatchStatement.Type batchingType = 
BatchStatement.Type.LOGGED;
+
+
+public Options(SimpleClientProvider clientProvider) {
+this.clientProvider = clientProvider;
+}
+
+public Options withCQLStatementTupleMapper(CQLStatementTupleMapper 
cqlStatementTupleMapper) {
+this.cqlStatementTupleMapper = cqlStatementTupleMapper;
+return this;
+}
+
+public Options 
withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) 
{
+this.cqlResultSetValuesMapper = cqlResultSetValuesMapper;
+return this;
+}
+
+public Options withBatching(BatchStatement.Type batchingType) {
+this.batchingType = batchingType;
+return this;
+}
+
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is noop");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is noop");
+}
+
+public void prepare() {
+Preconditions.checkNotNull(options.cqlStatementTupleMapper, 
"CassandraState.Options should have cqlStatementTupleMapper");
+
+client = options.clientProvider.getClient(conf);
+session = client.connect();
+}
+
+public void cleanup() {
+session.close();
+client.close();
+}
+
+public void updateState(List tuples, final 
TridentCollector collector) {
--- End diff --

I think async writes should be used to update state. In cassandra batch 
statements must not be used to improved write performance. In addition, 
Cassandra will warn if the batch size is too large and this may lead to an 
overloading of the cluster. Batches must onl

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-12-02 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-161422378
  
Thank you @harshach @satishd. I will start working on 
https://issues.apache.org/jira/browse/STORM-1348 as soon as possible. BTW is my 
name will be add to the contributors list ? ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-12-01 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-161103792
  
@harshach @satishd I added ignore annotation to skip these tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-12-01 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-160937526
  
@harshach I can't reproduce the error. All tests pass successfully. Did you 
run tests after merging the branch ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-24 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-159316692
  
@satishd, @harshach ok perfect. So I will refactor API to remove 
Insert/Update builder. 
Then I add a simple interface to map tuples to columns : 
```java
public interface CqlMapper  extends Serializable {
List getColumns(ITuple tuple);
}
```
In addition, I will implement a SimpleStatementMapper to build query as 
follows : 

```java
new CassandraWriterBolt(
  async(
simpleQuery("INSERT INTO weather.temperature (weatherstation_id, 
event_time, temperature) VALUES(?,?,?)"),with(field("weatherstation_id"), 
field("event_time").now(), field("temperature")));
  )
);
```
Same Bolt could be write with QueryBuilder : 
```java
new CassandraWriterBolt(
  async(
simple(
QueryBuilder.insertInto("weather", "temperature")
.value("weatherstation_id", "?")
.value("event_time", "?")
.value("temperature", "?")
,
with(field("weatherstation_id"), field("event_time").now(), 
field("temperature")));
  )
);
```
What do you think about that ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-23 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-158930102
  
@satishd I think the fluent API handles common use cases. Its purpose is to 
ease the cassandra integration because creating a CQL statement from tuple is 
cumbersome. In case of specifics needs developers can still implement their own 
CQLStatementTupleMapper. So I think it will be bad to remove that API. In fact, 
there is a risk that developers that will use the connector have to re-develop 
a DSL.
I'm not sure, but I think there is not a lot changes into cassandra 
connector API. DataStax seems to focus their developments onto the cassandra 
internal system and not onto query language support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-19 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45364971
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Service to asynchronously executes cassandra statements.
+ */
+public class AsyncExecutor implements Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(AsyncExecutor.class);
+
+protected Session session;
+
+protected ExecutorService executorService;
+
+protected AsyncResultHandler handler;
+
+private Map, Boolean> pending = new 
ConcurrentHashMap<>( );
+
+/**
+ * Creates a new {@link AsyncExecutor} instance.
+ */
+protected AsyncExecutor(Session session, AsyncResultHandler 
handler) {
+this(session, newSingleThreadExecutor(), handler);
+}
+
+/**
+ * Creates a new {@link AsyncExecutor} instance.
+ *
+ * @param session The cassandra session.
+ * @param executorService The executor service responsible to execute 
handler.
+ */
+private AsyncExecutor(Session session, ExecutorService 
executorService, AsyncResultHandler handler) {
+this.session   = session;
+this.executorService = executorService;
+this.handler = handler;
+}
+
+protected static ExecutorService newSingleThreadExecutor() {
+return Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
+}
+
+/**
+ * Asynchronously executes all statements associated to the specified 
input. The input will be passed to
+ * the {@link #handler} once all queries succeed or failed.
--- End diff --

Yes there is an issue on the comment. This behavior is implemented with 

http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/Futures.html#allAsList(java.lang.Iterable)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-18 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45238310
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
 ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.FailedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses 
Murmur3 algorithm to choose the target task of a tuple.
+ *
+ * This stream grouping may be used to optimise writes to Apache Cassandra.
+ */
+public class Murmur3StreamGrouping implements CustomStreamGrouping {
--- End diff --

Yes, that's right! Actually, I didn't even noticed there is no method like 
this on component : customGrouping(componentId, customStreamGrouping, 
fields...).

So, one solution could be to pass the indexes of the the partition keys as 
follows : 
myBolt.customGrouping("comp", new Murmur3StreamGrouping(0,1)) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45070726
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
 ---
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends 
BaseCassandraBolt> {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+private static final int QUEUE_MAX_SIZE = 1000;
+
+private LinkedBlockingQueue queue;
+
+private int tickFrequencyInSeconds;
+
+private long lastModifiedTimesMillis;
+
+private String componentID;
+
+private AsyncResultHandler> asyncResultHandler;
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+}
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, 
int tickFrequencyInSeconds) {
+super(tupleMapper);
+this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+super.prepare(stormConfig, topologyContext, outputCollector);
+this.componentID = topologyContext.getThisComponentId();
+this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+this.lastModifiedTimesMillis = now();
+}
+
+@Override
+protected AsyncResultHandler> getAsyncHandler() {
+if( asyncResultHandler == null) {
+asyncResultHandler = new 
BatchAsyncResultHandler(getResultHandler());
+}
+return asyncResultHandler;
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+protected void process(Tuple input) {
+if( ! queue.offer(input) ) {
+LOG.info(logPrefix() + "The message queue is full, preparing 
batch statement...");
+prepareAndExecuteStatement();
+queue.add(input);
+}
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+protected void tick() {
--- End diff --

what names should be used ? onTickTuple, onSystemTuple... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45039507
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
 ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.FailedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses 
Murmur3 algorithm to choose the target task of a tuple.
+ *
+ * This stream grouping may be used to optimise writes to Apache Cassandra.
+ */
+public class Murmur3StreamGrouping implements CustomStreamGrouping {
--- End diff --

@harshach, there will be no performance issue using shuffle, field or 
murmur3. This strategy should be used with the BatchCassandraBolt in order to 
group all tuples that will be written into a same partition to the same storm 
task.

In fact, to stream the tuples according to cassandra (data location) we 
need to retrieve ip addresses of each task within the CustomStreamGrouping. I 
don't know if that is possible ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-10-31 Thread fhussonnois
Github user fhussonnois commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-152778819
  
I have updated the PR with the remarks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-10-28 Thread fhussonnois
GitHub user fhussonnois opened a pull request:

https://github.com/apache/storm/pull/827

STORM-1075 add external module storm-cassandra

https://issues.apache.org/jira/browse/STORM-1075

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhussonnois/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/827.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #827


commit db148647bd28f16e796d2b6640657d9ea1adf3d2
Author: Florian Hussonnois 
Date:   2015-10-28T11:08:58Z

STORM-1075 add external module storm-cassandra




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---