[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14714441#comment-14714441 ]
ASF GitHub Bot commented on STORM-855: -------------------------------------- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r38002218 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -18,23 +18,23 @@ (:import [backtype.storm.generated Grouping] [java.io Serializable]) (:use [backtype.storm util config log timer stats]) - (:import [java.util List Random HashMap ArrayList LinkedList Map]) + (:import [java.util List Random HashMap ArrayList Map]) (:import [backtype.storm ICredentialsListener]) - (:import [backtype.storm.hooks ITaskHook]) - (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId]) + (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId Batch]) (:import [backtype.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) (:import [backtype.storm.grouping CustomStreamGrouping]) (:import [backtype.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector]) (:import [backtype.storm.generated GlobalStreamId]) - (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time]) + (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue]) (:import [com.lmax.disruptor InsufficientCapacityException]) - (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]) + (:import [backtype.storm.serialization KryoTupleSerializer KryoBatchSerializer KryoTupleBatchSerializer KryoTupleBatchDeserializer]) (:import [backtype.storm.daemon Shutdownable]) - (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) + (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint]) (:import [backtype.storm Config Constants]) - (:import [java.util.concurrent ConcurrentLinkedQueue]) + (:import [java.util.concurrent ConcurrentLinkedQueue] + (backtype.storm.messaging TaskMessage)) --- End diff -- It would be nice to make this a vector, for consistency. > Add tuple batching > ------------------ > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Reporter: Matthias J. Sax > Assignee: Matthias J. Sax > Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)