[GitHub] storm pull request: STORM-1348 - refactor API to remove Insert/Upd...
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/929#issuecomment-165393591 @harshach is this PR can be merge if modifications are OK for you ? Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1348 - refactor API to remove Insert/Upd...
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/929#issuecomment-164410061 @satishd done ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1348 - refactor API to remove Insert/Upd...
GitHub user fhussonnois opened a pull request: https://github.com/apache/storm/pull/929 STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector I need to add some tests but I open this PR to start its review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhussonnois/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #929 commit 8eca74586ac852b89614f97ce5cd9429af7e7286 Author: Florian Hussonnois Date: 2015-12-06T21:23:29Z STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1211 Added trident support for Cassandra...
Github user fhussonnois commented on a diff in the pull request: https://github.com/apache/storm/pull/915#discussion_r46620267 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java --- @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.trident.state; + +import backtype.storm.task.IMetricsContext; +import backtype.storm.topology.FailedException; +import backtype.storm.tuple.Values; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.base.Preconditions; +import org.apache.storm.cassandra.client.SimpleClient; +import org.apache.storm.cassandra.client.SimpleClientProvider; +import org.apache.storm.cassandra.query.CQLResultSetValuesMapper; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.trident.operation.TridentCollector; +import storm.trident.state.State; +import storm.trident.tuple.TridentTuple; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class CassandraState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraState.class); + +private final Map conf; +private final Options options; + +private Session session; +private SimpleClient client; + +public CassandraState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions, Options options) { +this.conf = conf; +this.options = options; +} + +public static final class Options implements Serializable { +private final SimpleClientProvider clientProvider; +private CQLStatementTupleMapper cqlStatementTupleMapper; +private CQLResultSetValuesMapper cqlResultSetValuesMapper; +private BatchStatement.Type batchingType = BatchStatement.Type.LOGGED; + + +public Options(SimpleClientProvider clientProvider) { +this.clientProvider = clientProvider; +} + +public Options withCQLStatementTupleMapper(CQLStatementTupleMapper cqlStatementTupleMapper) { +this.cqlStatementTupleMapper = cqlStatementTupleMapper; +return this; +} + +public Options withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) { +this.cqlResultSetValuesMapper = cqlResultSetValuesMapper; +return this; +} + +public Options withBatching(BatchStatement.Type batchingType) { +this.batchingType = batchingType; +return this; +} + +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop"); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop"); +} + +public void prepare() { +Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper"); + +client = options.clientProvider.getClient(conf); +session = client.connect(); +} + +public void cleanup() { +session.close(); +client.close(); +} + +public void updateState(List tuples, final TridentCollector collector) { --- End diff -- Ok I understand your point but this may lead to some issues at scale. Usually writes are idempotent in cassandra. So even if a write fail you could resend all tuples. Could we imagine two strategies to update the state ? --- If your project is set up for it, you can
[GitHub] storm pull request: STORM-1211 Added trident support for Cassandra...
Github user fhussonnois commented on a diff in the pull request: https://github.com/apache/storm/pull/915#discussion_r46584477 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java --- @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.trident.state; + +import backtype.storm.task.IMetricsContext; +import backtype.storm.topology.FailedException; +import backtype.storm.tuple.Values; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.base.Preconditions; +import org.apache.storm.cassandra.client.SimpleClient; +import org.apache.storm.cassandra.client.SimpleClientProvider; +import org.apache.storm.cassandra.query.CQLResultSetValuesMapper; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.trident.operation.TridentCollector; +import storm.trident.state.State; +import storm.trident.tuple.TridentTuple; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class CassandraState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(CassandraState.class); + +private final Map conf; +private final Options options; + +private Session session; +private SimpleClient client; + +public CassandraState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions, Options options) { +this.conf = conf; +this.options = options; +} + +public static final class Options implements Serializable { +private final SimpleClientProvider clientProvider; +private CQLStatementTupleMapper cqlStatementTupleMapper; +private CQLResultSetValuesMapper cqlResultSetValuesMapper; +private BatchStatement.Type batchingType = BatchStatement.Type.LOGGED; + + +public Options(SimpleClientProvider clientProvider) { +this.clientProvider = clientProvider; +} + +public Options withCQLStatementTupleMapper(CQLStatementTupleMapper cqlStatementTupleMapper) { +this.cqlStatementTupleMapper = cqlStatementTupleMapper; +return this; +} + +public Options withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) { +this.cqlResultSetValuesMapper = cqlResultSetValuesMapper; +return this; +} + +public Options withBatching(BatchStatement.Type batchingType) { +this.batchingType = batchingType; +return this; +} + +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop"); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop"); +} + +public void prepare() { +Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper"); + +client = options.clientProvider.getClient(conf); +session = client.connect(); +} + +public void cleanup() { +session.close(); +client.close(); +} + +public void updateState(List tuples, final TridentCollector collector) { --- End diff -- I think async writes should be used to update state. In cassandra batch statements must not be used to improved write performance. In addition, Cassandra will warn if the batch size is too large and this may lead to an overloading of the cluster. Batches must onl
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/827#issuecomment-161422378 Thank you @harshach @satishd. I will start working on https://issues.apache.org/jira/browse/STORM-1348 as soon as possible. BTW is my name will be add to the contributors list ? ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/827#issuecomment-161103792 @harshach @satishd I added ignore annotation to skip these tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/827#issuecomment-160937526 @harshach I can't reproduce the error. All tests pass successfully. Did you run tests after merging the branch ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/827#issuecomment-159316692 @satishd, @harshach ok perfect. So I will refactor API to remove Insert/Update builder. Then I add a simple interface to map tuples to columns : ```java public interface CqlMapper extends Serializable { List getColumns(ITuple tuple); } ``` In addition, I will implement a SimpleStatementMapper to build query as follows : ```java new CassandraWriterBolt( async( simpleQuery("INSERT INTO weather.temperature (weatherstation_id, event_time, temperature) VALUES(?,?,?)"),with(field("weatherstation_id"), field("event_time").now(), field("temperature"))); ) ); ``` Same Bolt could be write with QueryBuilder : ```java new CassandraWriterBolt( async( simple( QueryBuilder.insertInto("weather", "temperature") .value("weatherstation_id", "?") .value("event_time", "?") .value("temperature", "?") , with(field("weatherstation_id"), field("event_time").now(), field("temperature"))); ) ); ``` What do you think about that ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/827#issuecomment-158930102 @satishd I think the fluent API handles common use cases. Its purpose is to ease the cassandra integration because creating a CQL statement from tuple is cumbersome. In case of specifics needs developers can still implement their own CQLStatementTupleMapper. So I think it will be bad to remove that API. In fact, there is a risk that developers that will use the connector have to re-develop a DSL. I'm not sure, but I think there is not a lot changes into cassandra connector API. DataStax seems to focus their developments onto the cassandra internal system and not onto query language support. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on a diff in the pull request: https://github.com/apache/storm/pull/827#discussion_r45364971 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.executor; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Service to asynchronously executes cassandra statements. + */ +public class AsyncExecutor implements Serializable { + +private final static Logger LOG = LoggerFactory.getLogger(AsyncExecutor.class); + +protected Session session; + +protected ExecutorService executorService; + +protected AsyncResultHandler handler; + +private Map, Boolean> pending = new ConcurrentHashMap<>( ); + +/** + * Creates a new {@link AsyncExecutor} instance. + */ +protected AsyncExecutor(Session session, AsyncResultHandler handler) { +this(session, newSingleThreadExecutor(), handler); +} + +/** + * Creates a new {@link AsyncExecutor} instance. + * + * @param session The cassandra session. + * @param executorService The executor service responsible to execute handler. + */ +private AsyncExecutor(Session session, ExecutorService executorService, AsyncResultHandler handler) { +this.session = session; +this.executorService = executorService; +this.handler = handler; +} + +protected static ExecutorService newSingleThreadExecutor() { +return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build()); +} + +/** + * Asynchronously executes all statements associated to the specified input. The input will be passed to + * the {@link #handler} once all queries succeed or failed. --- End diff -- Yes there is an issue on the comment. This behavior is implemented with http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/Futures.html#allAsList(java.lang.Iterable) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on a diff in the pull request: https://github.com/apache/storm/pull/827#discussion_r45238310 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java --- @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.task.WorkerTopologyContext; +import backtype.storm.topology.FailedException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +/** + * + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple. + * + * This stream grouping may be used to optimise writes to Apache Cassandra. + */ +public class Murmur3StreamGrouping implements CustomStreamGrouping { --- End diff -- Yes, that's right! Actually, I didn't even noticed there is no method like this on component : customGrouping(componentId, customStreamGrouping, fields...). So, one solution could be to pass the indexes of the the partition keys as follows : myBolt.customGrouping("comp", new Murmur3StreamGrouping(0,1)) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on a diff in the pull request: https://github.com/apache/storm/pull/827#discussion_r45070726 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java --- @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.Time; +import com.datastax.driver.core.Statement; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class BatchCassandraWriterBolt extends BaseCassandraBolt> { + +private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class); + +public static final int DEFAULT_EMIT_FREQUENCY = 2; + +private static final int QUEUE_MAX_SIZE = 1000; + +private LinkedBlockingQueue queue; + +private int tickFrequencyInSeconds; + +private long lastModifiedTimesMillis; + +private String componentID; + +private AsyncResultHandler> asyncResultHandler; + +/** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ +public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) { +this(tupleMapper, DEFAULT_EMIT_FREQUENCY); +} + +/** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ +public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) { +super(tupleMapper); +this.tickFrequencyInSeconds = tickFrequencyInSeconds; +} +/** + * {@inheritDoc} + */ +@Override +public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) { +super.prepare(stormConfig, topologyContext, outputCollector); +this.componentID = topologyContext.getThisComponentId(); +this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); +this.lastModifiedTimesMillis = now(); +} + +@Override +protected AsyncResultHandler> getAsyncHandler() { +if( asyncResultHandler == null) { +asyncResultHandler = new BatchAsyncResultHandler(getResultHandler()); +} +return asyncResultHandler; +} + +/** + * {@inheritDoc} + */ +@Override +protected void process(Tuple input) { +if( ! queue.offer(input) ) { +LOG.info(logPrefix() + "The message queue is full, preparing batch statement..."); +prepareAndExecuteStatement(); +queue.add(input); +} +} +/** + * {@inheritDoc} + */ +@Override +protected void tick() { --- End diff -- what names should be used ? onTickTuple, onSystemTuple... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on a diff in the pull request: https://github.com/apache/storm/pull/827#discussion_r45039507 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java --- @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.task.WorkerTopologyContext; +import backtype.storm.topology.FailedException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +/** + * + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple. + * + * This stream grouping may be used to optimise writes to Apache Cassandra. + */ +public class Murmur3StreamGrouping implements CustomStreamGrouping { --- End diff -- @harshach, there will be no performance issue using shuffle, field or murmur3. This strategy should be used with the BatchCassandraBolt in order to group all tuples that will be written into a same partition to the same storm task. In fact, to stream the tuples according to cassandra (data location) we need to retrieve ip addresses of each task within the CustomStreamGrouping. I don't know if that is possible ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
Github user fhussonnois commented on the pull request: https://github.com/apache/storm/pull/827#issuecomment-152778819 I have updated the PR with the remarks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1075 add external module storm-cassandra
GitHub user fhussonnois opened a pull request: https://github.com/apache/storm/pull/827 STORM-1075 add external module storm-cassandra https://issues.apache.org/jira/browse/STORM-1075 You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhussonnois/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/827.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #827 commit db148647bd28f16e796d2b6640657d9ea1adf3d2 Author: Florian Hussonnois Date: 2015-10-28T11:08:58Z STORM-1075 add external module storm-cassandra --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---