[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15219610#comment-15219610 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax closed the pull request at: https://github.com/apache/storm/pull/694 > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15219609#comment-15219609 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-203830224 Closing this (also closing the JIRA) as Bobby's work (https://github.com/apache/storm/pull/765) got merged and I don't have time to work on this right now. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14984232#comment-14984232 ] ASF GitHub Bot commented on STORM-855: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/838#issuecomment-152785729 @knusbaum Seems like wrong issue is associated. STORM-885 would be proper. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983537#comment-14983537 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559976 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -510,8 +546,7 @@ public void operationComplete(ChannelFuture future) throws Exception { throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " + connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost"); -} +} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983535#comment-14983535 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559969 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -495,7 +531,7 @@ public void operationComplete(ChannelFuture future) throws Exception { connectionAttempt); if (messagesLost.get() > 0) { LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get()); -} +} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983533#comment-14983533 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559957 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -386,11 +405,10 @@ private void waitForPendingMessagesToBeSent() { } catch (InterruptedException e) { break; -} } - } +} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983527#comment-14983527 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559925 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -281,6 +296,10 @@ private Channel getConnectedChannel() { } return null; } +} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983529#comment-14983529 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559934 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -292,7 +311,7 @@ private void dropMessages(Iterator msgs) { // We consume the iterator by traversing and thus "emptying" it. int msgCount = iteratorSize(msgs); messagesLost.getAndAdd(msgCount); -} +} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983532#comment-14983532 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559947 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -375,7 +394,7 @@ private void waitForPendingMessagesToBeSent() { long totalPendingMsgs = pendingMessages.get(); long startMs = System.currentTimeMillis(); while (pendingMessages.get() != 0) { -try { +try { --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983530#comment-14983530 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559942 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -335,7 +354,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } }); -} +} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983524#comment-14983524 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559884 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -202,7 +217,7 @@ public Status status() { @Override public Iterator recv(int flags, int clientId) { throw new UnsupportedOperationException("Client connection should not receive any messages"); -} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983526#comment-14983526 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43559892 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -266,7 +281,7 @@ public void send(Iterator msgs) { // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer // because we know `Channel.isWritable` was false after the messages were already in the buffer. } -} --- End diff -- Spacing > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14983491#comment-14983491 ] ASF GitHub Bot commented on STORM-855: -- GitHub user knusbaum opened a pull request: https://github.com/apache/storm/pull/838 [STORM-855] Heartbeat Server (Pacemaker) This pull request redirects worker heartbeats away from Zookeeper and into a new server, 'Pacemaker'. The redirection is accomplished by making `ClusterState` pluggable through the `ClusterStateFactory` interface. By default, Pacemaker is not enabled. It can be enabled by setting `storm.cluster.state.store` from its default value of `"backtype.storm.cluster_state.zookeeper_state_factory"` to `"org.apache.storm.pacemaker.pacemaker_state_factory"` Pacemaker includes both digest-based and kerberos-based security, but it is primitive. Right now Pacemaker is not HA, but currently if Pacemaker fails, Nimbus will NOT start killing and reassigning workers, so Pacemaker going down won't bring down a cluster. It does need to be brought back up before new jobs can be submitted, though. You can merge this pull request into a Git repository by running: $ git pull https://github.com/knusbaum/incubator-storm STORM-855 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #838 commit 444ec05e5a9f38f9a9472c54b39f1371c839683b Author: Kyle Nusbaum Date: 2015-10-30T22:21:27Z PACEMAKER OPEN SOURCE! > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14967622#comment-14967622 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/765#issuecomment-149987537 So I have been running a number of tests trying to come to a conclusive decision on how storm should handle batching, and trying to understand the difference between my test results and the test results from #694. I ran the word count test I wrote as a part of #805 on a 35 node storm cluster. This was done against several different storm versions, the baseline in the #805 pull request; this patch + #805 (batch-v2); and #694 + #805 + modifications to use the hybrid approach to enable acking and batch to work in a multi-process topology (STORM-855). To avoid having all of the numbers be hard to parse I am just going to include some charts, but if anyone wants to see the raw numbers or reproduce it themselves I am happy to provide data and/or branches. The numbers below were collected after the topology had been running for at least 200 seconds. This is to avoid startup issues like JIT etc. I filtered out any 30 second interval where the measured throughput was not +/- 10% of the target throughput on the assumption that if the topology cannot keep up with the desired throughput or it was trying to catch up from previous slowness it would not be within that range. I did not filter based off of the number of failures that happened, simply because that would have resulted in removing all of the STORM-855 with batching enabled results. None of the other test configurations saw any failures at all during testing. ![throughput-vs-latency](https://cloud.githubusercontent.com/assets/3441321/10644336/d0393222-77ed-11e5-849a-0b6be6ac5178.png) This shows the 99%-ile latency vs measured throughput. It is not too interesting except to note that batching in STORM-855 at low throughput resulted in nothing being fully processed. All of the tuples timed out before they could finish. Only at a medium throughput above 16,000 sentences/second were we able to maintain enough tuples to complete batches regularly, but even then many tuples would still time out. This should be able to be fixed with a batch timeout, but that is not implemented yet. To get a better view I adjusted the latency to be a log scale. ![throughput-vs-latency-log](https://cloud.githubusercontent.com/assets/3441321/10644335/d02ab29c-77ed-11e5-883e-a647f6b4279b.png) From this we can see that on the very low end batching-v2 is increasing the 99%-ile latency from 5-10 ms to 19-21 ms. Most of that you can get back by configuring the batch size to 1, instead of the default 100 tuples. However, once the baseline stops functioning at around 7000 sentences/sec the batching code is able to continue working, with either a batch size of 1 or 100. I believe that this has to do with the automatic backpressure. In the baseline code backpressure does not take into account the overflow buffer, but in the batching code it does. I think this gives the topology more stability in maintaining a throughput, but I don't have any solid evidence for that. I then zoomed in on the graphs to show what a 2 second SLA would look like ![throughput-vs-latency-2-sec](https://cloud.githubusercontent.com/assets/3441321/10644332/d0176f5c-77ed-11e5-98c4-d2e7a9e48c70.png) and a 100 ms SLA. ![throughput-vs-latency-100-ms](https://cloud.githubusercontent.com/assets/3441321/10644334/d0291540-77ed-11e5-9fb3-9c9c97f504f9.png) In both cases the batching v2 with a batch size of 100 was able to handle the highest throughput for that given latency. Then I wanted to look at memory and CPU Utilization. ![throughput-vs-mem](https://cloud.githubusercontent.com/assets/3441321/10644337/d03c3094-77ed-11e5-8cda-cf53fe3a2389.png) Memory does not show much, the amount of memory used varies a bit from one to the other, but if you realize this is for 35 worker processes it is varying from 70 MB/worker to about 200 MB/worker. The numbers simply show that as the throughput increases the memory utilizations does too, and it does not vary too much from one implementation to another. ![throughput-vs-cpu](https://cloud.githubusercontent.com/assets/3441321/10645834/6ba799e0-77f5-11e5-88fd-7e09475a5b6c.png) CPU however shows that on the low end we are going from 7 or 8 cores worth of CPU time to about 35 cores worth for the batching code. This seems to be the result of the batch flushing threads waking up periodically. We should be able to mitigate this by adjusting that interval to be larger, but that would in turn impact the latency. I bel
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14959782#comment-14959782 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/765#issuecomment-148544657 @mjsax What I saw when testing STORM-855 was that the maximum throughput was cut almost in half from 10,000 sentences per second to 5,500. But your numbers showed maximum throughput more than doubling from around 7,960,300 tuples sent in 30 seconds to 16,347,100 in the same time period (no-acking). And 1,832,160 in 30 seconds to 2,323,580 an increase of 25% with acking. To me this feels like a contradiction. The only thing I can think of is that the messaging layer is so scary slow that cutting the maximum throughput of a worker by half has no impact on the overall performance if it can double the throughput of the messaging layer, by doing more batching. This is likely the case, as on the high end 16,347,100 / 30 seconds / 24 workers is about 22,000 tuples per second per worker, where as 5,500 sentences per second results in about 181,500 total tuples per second/worker being processed. I'm just looking for feedback from others on this, but it looks like I need to do a distributed apples to apples comparison as well to see the impact the messaging layer has. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14959747#comment-14959747 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/765#issuecomment-148540526 I had a look at the numbers. It's a lot of stuff and hard to parse for me... I am not sure what you mean by "contradict the numbers"? Can you explain in more detail? However, I agree with your thought about STORM-855 #694 It basically reduces the contention on the output queue, because less calls are made here. I profiled Storm once and observed that on high-end data-rates (when we hit the wall) the contention on the output-queue is the bottleneck (the writing and reading thread have to lock the queue and a lot of waiting for the lock consumes a fair share of the consumed time). > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952055#comment-14952055 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/765#issuecomment-147125596 Sorry I forgot to mention that for one test STORM-855-100 was able to run at a throughput of 6000 sentences/second successfully, but in other runs it failed, so I set the number to 5500, even though a 6000 is shown in the results. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952050#comment-14952050 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/765#issuecomment-147124664 I have some new test results. I did a comparison of several different branches. I looked at this branch, the upgraded-disruptor branch #750, STORM-855 #694, and apache-master 0.11.0-SNAPSHOT (04cf3f6162ce6fdd1ec13b758222d889dafd5749). I had to make a few modifications to get my test to work. I applied the following patch https://gist.github.com/revans2/84301ef0fde0dc4fbe44 to each of the branches. For STORM-855 I had to modify the test a bit so it would optionally do batching. In that case batching was enabled on all streams and all spouts and bolts. I then ran the test at various throughputs 100, 200, 400, 800, 1600, 3200, 6400, 1, 12800, 25600. and possibly a few others when looking for it to hit the maximum throughput, and different batch sizes. Each test ran for 5 mins. Here is the results of that test, excluding the tests where the worker could not keep up with the rate. | 99%-ile ns | 99.9%-ils ns | throughput | branch-batch | mean latency ns | avg service latency ms | std-dev ns | |---|---|---|---|---|---|---| | 2,613,247 | 4,673,535 | 100 | STORM-855-0 | 2,006,347.25 | 1.26 | 2,675,778.36 | | 2,617,343 | 4,423,679 | 200 | STORM-855-0 | 1,991,238.45 | 1.29 | 2,024,687.45 | | 2,623,487 | 5,619,711 | 400 | STORM-855-0 | 1,999,926.81 | 1.24 | 1,778,335.92 | | 2,627,583 | 4,603,903 | 1600 | STORM-855-0 | 1,971,888.24 | 1.30 | 893,085.40 | | 2,635,775 | 8,560,639 | 800 | STORM-855-0 | 2,010,286.65 | 1.35 | 2,134,795.12 | | 2,654,207 | 302,252,031 | 3200 | STORM-855-0 | 2,942,360.75 | 2.13 | 16,676,136.60 | | 2,684,927 | 124,190,719 | 3200 | batch-v2-1 | 2,154,234.45 | 1.41 | 6,219,057.66 | | 2,701,311 | 349,700,095 | 5000 | batch-v2-1 | 2,921,661.67 | 1.78 | 18,274,805.30 | | 2,715,647 | 7,356,415 | 100 | storm-base-1 | 2,092,991.53 | 1.30 | 2,447,956.21 | | 2,723,839 | 4,587,519 | 400 | storm-base-1 | 2,082,835.21 | 1.31 | 1,978,424.49 | | 2,723,839 | 6,049,791 | 100 | dist-upgraade-1 | 2,091,407.68 | 1.31 | 2,222,977.89 | | 2,725,887 | 10,403,839 | 1600 | batch-v2-1 | 2,010,694.30 | 1.27 | 2,095,223.90 | | 2,725,887 | 4,607,999 | 200 | storm-base-1 | 2,074,784.50 | 1.30 | 1,951,564.93 | | 2,727,935 | 4,513,791 | 200 | dist-upgraade-1 | 2,082,025.31 | 1.33 | 2,057,591.08 | | 2,729,983 | 4,182,015 | 400 | dist-upgraade-1 | 2,056,282.29 | 1.43 | 862,428.67 | | 2,732,031 | 4,632,575 | 800 | storm-base-1 | 2,092,514.39 | 1.27 | 2,231,550.66 | | 2,734,079 | 4,472,831 | 800 | dist-upgraade-1 | 2,095,994.08 | 1.28 | 1,870,953.62 | | 2,740,223 | 4,192,255 | 200 | batch-v2-1 | 2,011,025.19 | 1.21 | 911,556.19 | | 2,742,271 | 4,726,783 | 1600 | storm-base-1 | 2,089,581.40 | 1.35 | 2,410,668.79 | | 2,748,415 | 4,444,159 | 400 | batch-v2-1 | 2,055,600.78 | 1.34 | 1,729,257.92 | | 2,748,415 | 4,575,231 | 100 | batch-v2-1 | 2,035,920.21 | 1.31 | 1,213,874.52 | | 2,754,559 | 16,875,519 | 1600 | dist-upgraade-1 | 2,098,441.13 | 1.35 | 2,279,870.41 | | 2,754,559 | 3,969,023 | 800 | batch-v2-1 | 2,026,222.88 | 1.29 | 767,491.71 | | 2,793,471 | 53,477,375 | 3200 | storm-base-1 | 2,147,360.05 | 1.42 | 3,668,366.37 | | 2,801,663 | 147,062,783 | 3200 | dist-upgraade-1 | 2,358,863.31 | 1.59 | 7,574,577.81 | | 13,344,767 | 180,879,359 | 6400 | batch-v2-100 | 11,319,553.69 | 10.62 | 7,777,381.54 | | 13,369,343 | 15,122,431 | 3200 | batch-v2-100 | 10,699,832.23 | 10.02 | 1,623,949.38 | | 13,418,495 | 15,392,767 | 800 | batch-v2-100 | 10,589,813.17 | 9.86 | 2,439,134.80 | | 13,426,687 | 14,680,063 | 400 | batch-v2-100 | 10,738,973.68 | 10.03 | 2,298,229.99 | | 13,484,031 | 14,368,767 | 200 | batch-v2-100 | 10,941,653.28 | 10.20 | 2,471,899.43 | | 13,508,607 | 14,262,271 | 100 | batch-v2-100 | 11,099,257.68 | 10.35 | 1,658,054.66 | | 13,524,991 | 14,376,959 | 1600 | batch-v2-100 | 10,723,471.83 | 10.00 | 1,477,621.07 | | 346,554,367 | 977,272,831 | 12800 | batch-v2-100 | 18,596,303.93 | 15.59 | 78,326,501.83 | | 710,934,527 | 827,326,463 | 4000 | STORM-855-100 | 351,305,653.90 | 339.28 | 141,283,307.30 | | 783,286,271 | 1,268,776,959 | 5000 | STORM-855-100 | 332,417,358.65 | 312.07 | 139,760,316.82 | | 888,668,159 | 1,022,361,599 | 3200 | STORM-855-100 | 445,646,342.60 | 431.55 | 179,065,279.65 | | 940,048,383 | 1,363,148,799 | 6400 | storm-base-1 | 20,225,300.17 | 17.17 | 134,848,974.52 | | 1,043,333,119 | 1,409,286,143 | 1 | batch-v2-1 | 22,750,840.18 | 6.13 | 146,235,076.73 | | 1,209,008,127 | 1,786,773,503
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943400#comment-14943400 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-145539778 Thx. Take your time; there is actually no rush. I was just curious :) > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943383#comment-14943383 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-145534243 I still need to run some tests. I am way behind on my open source commitments. I will try really hard this week to play around with this and let you know the results. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943115#comment-14943115 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-145473220 Hi, what is the next step? > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14936048#comment-14936048 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-144211924 One question: There are a lot of changes in `storm-core/src/jvm/backtype/storm/generated/*` resulting from rebuild those files with `genthrift.sh`. However, it seems to me that only the changes to `ComponentCommon.java` (and I guess to `storm-core/src/py/storm/ttypes.py`) are relevant. For all other classes it this package it seems they include variable renaming only. The files in which only the generation date was changed are not included already. Can I safely revert those other files, too? Or might I break something? > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14935978#comment-14935978 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-144203098 I just pushed a new version (rebased to current master and squahed commits etc). The PR is in much cleaner state now. Looking forward to your feedback. Batching does now work with acks and each output stream can have a different batch size. The ack streams (`__ack_init`, `__ack_ack`, and `__ack_fail`) are treated as regular streams from a batching point of view. Metric and Eventlogger should work too (but I did not test is). If you want to test this, be aware that hybrid batching is **not supported** yet (for performance reasons -- at least for the non-acking case). Thus, right now you set batch size via and `int` only if acking is disabled or via `HashMap` and the map must contain an entry for each output stream (including ack streams) and the batch size must be at least 1. (After we decided how to proceed -- see next paragraph --, this can be cleaned up.) The problem with hybrid batching is the serialization path in *executor.clj* at `mk-transfer-fn` and `start-batch-transfer->worker-handler!`. I wrote the code for hybrid serialization already but disabled it, ie, put it into comments. Because, I am not able to set different serializers for different output stream, only a hybrid serializer could be used. However, the runtime binding to the correct method for TupleImpl or Batch reduced the throughput (see numbers below). Not sure if/how this could be resolved. On the other hand, a batch size of one does not have a big performance penalty -- maybe it would be worth to enable batching all the time (ie, even for the non-batching case, just use a batch size of one) to avoid the hybrid setup. I also did some network monitoring using `nmon`. It shows that batching does reduce the actually transfered number of byte over the network. The perf tool does not measure but compute the number of transfered bytes (what is not quite accurate). Right now, I don't have the numbers, but if you wish I could rerun those experiments and post here. I collected the following number for non-acking and acking: **NO ACKING** `--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 --testTimeSec 40` no batching: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 96 0 11 0 1443539268669 0 0 0.0 WAITING 1 96 11 11 11 1443539298669 3 6688960 21.263631184895832 WAITING 1 96 11 11 11 1443539328669 3 7518460 23.900540669759113 RUNNING 1 96 11 11 11 1443539358669 3 1042898033.15283457438151 RUNNING 1 96 11 11 11 1443539388669 3 1039520033.045450846354164 ``` batch size = 1 (to measure overhead; about 5%): ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 96 0 11 0 1443539471193 0 0 0.0 WAITING 1 96 11 11 11 1443539501193 3 3089120 9.820048014322916 WAITING 1 96 11 11 11 1443539531193 3 9134740 29.038556416829426 RUNNING 1 96 11 11 11 1443539561193 3 9502680 30.208206176757812 RUNNING 1 96 11 11 11 1443539591193 3 9672300 30.747413635253906 ``` batch size = 100 (throughput improvement by about 85%) ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 96 0 11 0 1443539658994 0 0 0.0 WAITING 1 96 11 11 11 1443539688994 3 8345560 26.529820760091145 WAITING 1 96 11 11 11 1443539718994 3 1987646063.18556467692057 RUNNING 1 96 11 11 11 1443539748994 3 1822988057.95122782389323 RUNNING 1 96 11 11 11 1443539778994 3 1829466058.15715789794922 ``` **ACKING** `--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 --testTimeS
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14935209#comment-14935209 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-144071648 I see. It's a github issue... Usually I rebase before updating a PR. This time I did not... Thanks for the quick response. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14935192#comment-14935192 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-144066244 @mjsax the issue is with how you upmerge. If you just do a git merge or a git pull github can become confused because it thinks you are still based off of the original commit, and will include the commits from the upmerge. Alternatively you could do a git rebase and rebase all of your changes on a new version, instead of merging in the new version. This will clean it up, but before you do this please make sure you have a backup of your changes, and it does a destructive write that can, and I have had this personally happen to me, delete all of your code. When you are pushing to the github repo you will have to include a -f because git by default does not like destructive writes and you have to force it to do them. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14935175#comment-14935175 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-144060581 I just realized, that some commits from other people got added to this PR. This confuses me. Can you guide me through the process Storm development is following here? I am not used to that... > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14906600#comment-14906600 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-142980140 Ok. Hope to get it done over the weekend... > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14906577#comment-14906577 ] ASF GitHub Bot commented on STORM-855: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-142977482 @mjsax Thanks for the quick reply. It would be great to ack issue fixed than others can also run some tests. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14906575#comment-14906575 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-142977052 I did not have time yet. However, I am still on it... I did already some more work for this PR that is not pushed yet (for example to support different output batches for different output streams and some "clean up" of my changes). I think I have an overall cleaner design now of my code. As a next step, I actually want to investigate why this PR only gives factor two throughput improvement (because Aeolus give factor five). But if you prefer, I can also work on the ack problem first to get a fully running PR for now. (I just disabled acking in my current tests ;)) > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14906568#comment-14906568 ] ASF GitHub Bot commented on STORM-855: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-142975644 @mjsax did you get a chance to look at the ack tuples issue. This is going to be great perf improvement would like to see this merged in. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14721538#comment-14721538 ] Matthias J. Sax commented on STORM-855: --- There are some additional results in the PR comments: https://github.com/apache/storm/pull/694 And there are more result available at similar links: https://www2.informatik.hu-berlin.de/~saxmatti/storm-aeolus-benchmark/batchingBenchmark-spout-batching-16.pdf [so all numbers from 0 to 16] > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14721215#comment-14721215 ] Sriharsha Chintalapani commented on STORM-855: -- [~revans2] [~mjsax] Thanks for the details. [~revans2] do we have any base numbers that we can compare against this patch. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14717924#comment-14717924 ] ASF GitHub Bot commented on STORM-855: -- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-135608325 acking does seem to be having some issues. The ack tuples don't seem to be batched, so the serialization code gets confused. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14716669#comment-14716669 ] Robert Joseph Evans commented on STORM-855: --- [~mjsax] and [~sriharsha], Micro batching is happening for two different reasons and two very distinct ways. In trident the microbatch is logical only. The batch is used to divide the tuples into distinct logical groups that can be committed to a state store as a whole. The tuples still flow through storm one at a time. This feature, however, is intended to provide a knob that allows you to give up some latency for increased throughput. When profiling storm one of the things I notice is that a very large percentage of the CPU is used context switching, and waking up threads. This is great for very low latency processing, but provides a huge overhead per tuple. If we can batch the tuples together in a single data structure we amortize the overhead of those context switches, and we can get increased throughput but at the cost of waiting for a batch to fill. After talking to Matthias, I have been thinking about this feature a lot, and I honestly don't know the best way to implement it. Flink uses a different mechanism where the input buffer to each bolt equivalent will wait to wake-up the processing piece until a timeout occurs or a specific number of tuples is reached. This is essentially micro batching the input to each bolt dynamically. I am starting to lean more towards that side for a few reasons. First it allows for a very fine grained tuning, on a per bolt basis. It makes knowing how congested a bolt or queue is much simpler. And the batching efficiency does not degrade with the depth of the topology. In this case each batch as it flows through a grouping is likely to be split into smaller and smaller batches to the point that the batch has a very small number of tuples in it. All of these feel like they would make STORM-815 much more likely to work well. That said this has an implementation that will give users a benefit now. How about this I will try to come up with a quick prototype for disruptor queue microbatching as an alternative, and we can see through actual benchmarks how close they are to each other performance wise, and how much impact there is on the code base. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14716323#comment-14716323 ] Matthias J. Sax commented on STORM-855: --- I never worked with Trident by myself, but as far as I understand, micro-batching breaks tuple-by-tuple processing semantics. A batch of tuples is assembled at the source and piped through the topology. The batch stays a batch the whole time. This is quite different from my approach: Tuples are only batched "under the hood" and tuple-by-tuple processing semantics are preserved. A batch is assembled at the output of an operator and "de-assembled" at the consumer. The consumer does not need to batch its own output. Hence, batching is introduced on a operator basis, ie, for each operator batching (the output) can be enabled and disabled independently (also allowing for different batch sizes for different operators and different batch sizes for different output streams). Thus, the latency might not increase as much as batch size can be adjusted fine grained. Additionally, if a single tuple fails, only this single tuple needs to get replayed (and not the whole batch as in Trident). Last but not least, [~revans2] encourage me to contribute this feature. Please see here: https://mail-archives.apache.org/mod_mbox/storm-dev/201505.mbox/%3C55672973.9040809%40informatik.hu-berlin.de%3E > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14715568#comment-14715568 ] Sriharsha Chintalapani commented on STORM-855: -- [~mjsax] curious why we are introducing batching on core api since trident provides micro-batching already. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14714441#comment-14714441 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r38002218 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -18,23 +18,23 @@ (:import [backtype.storm.generated Grouping] [java.io Serializable]) (:use [backtype.storm util config log timer stats]) - (:import [java.util List Random HashMap ArrayList LinkedList Map]) + (:import [java.util List Random HashMap ArrayList Map]) (:import [backtype.storm ICredentialsListener]) - (:import [backtype.storm.hooks ITaskHook]) - (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId]) + (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId Batch]) (:import [backtype.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) (:import [backtype.storm.grouping CustomStreamGrouping]) (:import [backtype.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector]) (:import [backtype.storm.generated GlobalStreamId]) - (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time]) + (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue]) (:import [com.lmax.disruptor InsufficientCapacityException]) - (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]) + (:import [backtype.storm.serialization KryoTupleSerializer KryoBatchSerializer KryoTupleBatchSerializer KryoTupleBatchDeserializer]) (:import [backtype.storm.daemon Shutdownable]) - (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) + (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint]) (:import [backtype.storm Config Constants]) - (:import [java.util.concurrent ConcurrentLinkedQueue]) + (:import [java.util.concurrent ConcurrentLinkedQueue] + (backtype.storm.messaging TaskMessage)) --- End diff -- It would be nice to make this a vector, for consistency. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711675#comment-14711675 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134682336 Here are some additional benchmark results with larger `--messageSize` (ie, 100 and 250). Those benchmarks are run in a 12 node cluster (with `nimbus.thrift.max_buffer_size: 33554432` and `worker.childopts: "-Xmx2g"`) as follows: `storm jar target/storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 --testTimeSec 300` As you can observe, the output rate is slightly reduced for larger tuple size in all cases (what is expected due to larger serialization costs). That batching case with batch size 100 and`--messageSize 250` does not improve output rate as significant as before, because it hits the underlaying 1Gbit Ethernet limit. In the beginning, it starts promising. After about 2 minutes performance does down. Because tuples cannot be transfered over the network fast enough, they get buffered in main memory which reaches it's limit (assumption) and thus Storm slows down the spout. Not sure, why performances goes down in each report. Any ideas? `--messageSize 100` **master branch** ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 144 0 11 0 1440523358523 0 0 0.0 WAITING 1 144 11 11 11 1440523388523 3 6336040 20.14172871907552 WAITING 1 144 11 11 11 1440523418523 3 1055420033.55089823404948 RUNNING 1 144 11 11 11 1440523448523 3 1000584031.807708740234375 RUNNING 1 144 11 11 11 1440523478523 3 1051412033.4234873453776 RUNNING 1 144 11 11 11 1440523508523 3 1043070033.158302307128906 RUNNING 1 144 11 11 11 1440523538523 3 7608960 24.188232421875 RUNNING 1 144 11 11 11 1440523568523 3 1027946032.67752329508463 RUNNING 1 144 11 11 11 1440523598524 30001 1049626033.36559974774929 RUNNING 1 144 11 11 11 1440523628523 2 1038286033.00732328691556 RUNNING 1 144 11 11 11 1440523658523 3 1013828032.22872416178385 RUNNING 1 144 11 11 11 1440523688523 3 1007294032.02101389567057 RUNNING 1 144 11 11 11 1440523718523 3 1009582032.09374745686849 ``` **batching branch (no batching)** ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 144 0 11 0 1440520763917 0 0 0.0 WAITING 1 144 11 11 11 1440520793917 3 4467900 14.203071594238281 WAITING 1 144 11 11 11 1440520823917 3 1061616033.74786376953125 RUNNING 1 144 11 11 11 1440520853917 3 1047370033.294995625813804 RUNNING 1 144 11 11 11 1440520883917 3 1055686033.55935414632162 RUNNING 1 144 11 11 11 1440520913917 3 1058076033.63533020019531 RUNNING 1 144 11 11 11 1440520943917 3 1036758032.95764923095703 RUNNING 1 144 11 11 11 1440520973917 3 1064676033.84513854980469 RUNNING 1 144 11 11 11 1440521003917 3 1075030034.17428334554037 RUNNING 1 144 11 11 11 1440521033917 3 1060722033.719444274902344 RUNNING 1 144 11 11 11 1440521063917 3 1045692033.24165344238281 RUNNING 1 144 11 11 11 1440521093917 3 1010800032.132466634114586 RUNNING 1 144 11 11 11 1440521123917 3 1057612033.6205800374349 ``` **batching branch (batch size: 100)** ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 144 0 11 0 1440521
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711551#comment-14711551 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134656876 I just checked some older benchmark result doing batching in user land, ie, on top of Storm (=> Aeolus). For this case, a batch size of 100 increased the spout output rate by a factor of 6 (instead of 1.5 as the benchmark above shows). The benchmark should yield more than 70M tuples per 30 seconds... (and not about 19M). Of course, batching is done a little different now. In Aeolus, a fat-tuple is used as batch. Thus, the system sees only a single batch-tuple. Now, the system sees all tuples, but emitting is delayed until the batch is full (this still saved the overhead of going through the disruptor for each tuple). However, we generate a tuple-ID for each tuple in the batch, instead of a single ID per batch. Not sure how expensive this is. Because acking was not enabled, it should not be too expensive, because the IDs have not to be "registered" at the ackers (right?). As a further optimization, it might be a good idea not to batch whole tuples, but only `Values` and tuple-id. The `worker-context`, `task-id`, and `outstream-id` is the same for all tuples within a batch. I will try this out, and push a new version the next days if it works. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710539#comment-14710539 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134468397 Excellent work. This looks very promising. I'll address your questions tomorrow. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710451#comment-14710451 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134450197 I just pushed some changes (I added a new commit, so you can better see what I changed): - added type hints - split tuple and batch serialization in separate classes - assemble different "emit function" in Clojure for single tuple and batch case (to add more type hints) I get the following result running on a 4 node cluster with parameters: `storm jar storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --bolt 3 --name test -l 1 -n 1 --messageSize 4 --workers 4 --spout 1 --testTimeSec 300` Master Branch: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 48 0 4 0 1440466170638 0 0 0.0 WAITING 1 48 4 4 4 1440466200638 3 6122840 0.7785593668619791 WAITING 1 48 4 4 4 1440466230638 3 115654001.4706166585286458 RUNNING 1 48 4 4 4 1440466260638 3 113940401.4488271077473958 RUNNING 1 48 4 4 4 1440466290638 3 117182401.49005126953125 RUNNING 1 48 4 4 4 1440466320638 3 116159201.4770406087239583 RUNNING 1 48 4 4 4 1440466350638 3 115573801.4695968627929688 RUNNING 1 48 4 4 4 1440466380638 3 115810801.4726104736328125 RUNNING 1 48 4 4 4 1440466410638 3 114926001.4613596598307292 RUNNING 1 48 4 4 4 1440466440638 3 114137601.451334635417 RUNNING 1 48 4 4 4 1440466470638 3 113005801.4369430541992188 RUNNING 1 48 4 4 4 1440466500638 3 113687601.4456125895182292 RUNNING 1 48 4 4 4 1440466530638 3 115098201.463549296061198 ``` Batching branch with batching disabled: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 48 0 4 0 1440467016767 0 0 0.0 WAITING 1 48 4 4 4 1440467046767 3 7095940 0.9022954305013021 WAITING 1 48 4 4 4 1440467076767 3 111366401.416097005208 RUNNING 1 48 4 4 4 1440467106767 3 111592201.4189682006835938 RUNNING 1 48 4 4 4 1440467136767 3 7757660 0.9864374796549479 RUNNING 1 48 4 4 4 1440467166767 3 113755801.4464797973632812 RUNNING 1 48 4 4 4 1440467196767 3 116699801.4839146931966145 RUNNING 1 48 4 4 4 1440467226767 3 113443801.4425125122070312 RUNNING 1 48 4 4 4 1440467256767 3 115214601.4650293986002605 RUNNING 1 48 4 4 4 1440467286767 3 114010401.4497172037760417 RUNNING 1 48 4 4 4 1440467316767 3 114937001.461499532063802 RUNNING 1 48 4 4 4 1440467346767 3 114526801.4562835693359375 RUNNING 1 48 4 4 4 1440467376767 3 111483001.4175796508789062 ``` Batching branch with batch size of 100 tuples: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetricstimetime-diff mstransferred throughput (MB/s) WAITING 1 48 1 4 0 1440467461710 0 0 0.0 WAITING 1 48 4 4 4 1440467491710 3 116860001.4859517415364583 WAITING 1 48 4 4 4 1440467521710 3 180266402.292205810546875 RUNNING 1 48 4 4 4 1440467551710 3 179363002.2807184855143228 RUNNING 1 48 4 4 4 1440467581710 3 189693002.4120712280273438 RUNNING 1 48 4 4
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710268#comment-14710268 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134412604 @mjsax, I'm not sure it's not related. Here is the output from my benchmarks: Pre-batching (apache master) ``` status timetime-diff mstransferred throughput (MB/s) WAITING 1440456969132 0 0 0.0 WAITING 1440456999132 3 99560 0.031649271647135414 WAITING 1440457029131 2 2073460 0.6591571546037212 RUNNING 1440457059131 3 2043680 0.6496683756510416 RUNNING 1440457089132 30001 2000600 0.6359524140536461 RUNNING 1440457119133 30001 1921980 0.6109606221947549 RUNNING 1440457149131 29998 1846040 0.5868794369819967 RUNNING 1440457179132 30001 1847640 0.5873293603429365 RUNNING 1440457209133 30001 1688440 0.5367227301733171 RUNNING 1440457239131 29998 1733740 0.5511778482986105 RUNNING 1440457269131 3 1751700 0.5568504333496094 RUNNING 1440457299133 30002 1748280 0.5557261962158252 RUNNING 1440457329132 2 1706500 0.54249982364321 RUNNING 1440457359133 30001 1787720 0.568281940243919 RUNNING 1440457389131 29998 1717900 0.5461421121922451 RUNNING 1440457419132 30001 1779180 0.5655672378466291 RUNNING 1440457449132 3 1639820 0.5212847391764323 RUNNING 1440457479131 2 1743720 0.5543321374058823 RUNNING 1440457509134 30003 1681140 0.5343665767700574 RUNNING 1440457539133 2 1716760 0.5457614985278155 RUNNING 1440457569131 29998 1700800 0.5407058061683279 RUNNING 1440457599132 30001 1754100 0.5575947863098574 RUNNING 1440457629134 30002 1669220 0.53059537445225 RUNNING 1440457659132 29998 1757240 0.5586487951735845 RUNNING 1440457689131 2 1781520 0.5663488343491658 RUNNING 1440457719131 3 1772920 0.5635960896809896 RUNNING 1440457749134 30003 1656020 0.5263819422907969 RUNNING 1440457779135 30001 1557820 0.49520113449017844 RUNNING 1440457809134 2 1750760 0.5565701677360599 RUNNING 1440457839131 29997 1773740 0.5639131519760049 RUNNING 1440457869132 30001 1656360 0.5265251127371275 RUNNING 1440457899131 2 1770140 0.5627311091847593 RUNNING 1440457929131 3 1767480 0.5618667602539062 ``` With batching, no typehints. (This pull request as of now) ``` status timetime-diff mstransferred throughput (MB/s) WAITING 1440455622424 0 0 0.0 WAITING 1440455652425 30001 87340 0.027763712807880363 WAITING 1440455682427 30002 652540 0.2074230512724933 RUNNING 1440455712428 30001 678920 0.21581566177611788 RUNNING 1440455742425 29997 729960 0.23207124179214797 RUNNING 1440455772425 3 667320 0.21213531494140625 RUNNING 1440455802427 30002 670380 0.2130938564870415 RUNNING 1440455832423 29996 616080 0.19587267397371733 RUNNING 1440455862423 3 645540 0.20521163940429688 RUNNING 1440455892428 30005 635080 0.2018528528122917 RUNNING 1440455922427 2 622760 0.1979766716507738 RUNNING 1440455952428 30001 610080 0.19393274455955634 RUNNING 1440455982424 29996 633800 0.20150646144095255 RUNNING 1440456012423 2 573820 0.18241854603161253 RUNNING 1440456042427 30004 596680 0.18965417648089627 RUNNING 1440456072423 29996 592880 0.18849660911819494 RUNNING 1440456102425 30002 558000 0.17737159807835728 RUNNING 1440456132424 2 614140 0.19523635610444828 RUNNING 1440456162423 2 580520 0.1845484896697
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710108#comment-14710108 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r37807820 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -658,40 +688,42 @@ ;;(log-debug "Received tuple " tuple " at task " task-id) ;; need to do it this way to avoid reflection - (let [stream-id (.getSourceStreamId tuple)] -(condp = stream-id - Constants/CREDENTIALS_CHANGED_STREAM_ID -(let [task-data (get task-datas task-id) - bolt-obj (:object task-data)] - (when (instance? ICredentialsListener bolt-obj) -(.setCredentials bolt-obj (.getValue tuple 0 - Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple) - (let [task-data (get task-datas task-id) -^IBolt bolt-obj (:object task-data) -user-context (:user-context task-data) -sampler? (sampler) -execute-sampler? (execute-sampler) -now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] -(when sampler? - (.setProcessSampleStartTime tuple now)) -(when execute-sampler? - (.setExecuteSampleStartTime tuple now)) -(.execute bolt-obj tuple) -(let [delta (tuple-execute-time-delta! tuple)] - (when (= true (storm-conf TOPOLOGY-DEBUG)) -(log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta)) - - (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) - (when delta -(builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data) - executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta) -(stats/bolt-execute-tuple! executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta))) + (let [tuples (if (instance? TupleImpl tupleOrBatch) (list tupleOrBatch) (.buffer tupleOrBatch))] +(doseq [tuple tuples] + (let [stream-id (.getSourceStreamId tuple)] --- End diff -- Thanks. However, this is not related to the decreased throughput I observed, because the spout output rate limits the throughput in my experiment. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicate
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710090#comment-14710090 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r37807094 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -658,40 +688,42 @@ ;;(log-debug "Received tuple " tuple " at task " task-id) ;; need to do it this way to avoid reflection - (let [stream-id (.getSourceStreamId tuple)] -(condp = stream-id - Constants/CREDENTIALS_CHANGED_STREAM_ID -(let [task-data (get task-datas task-id) - bolt-obj (:object task-data)] - (when (instance? ICredentialsListener bolt-obj) -(.setCredentials bolt-obj (.getValue tuple 0 - Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple) - (let [task-data (get task-datas task-id) -^IBolt bolt-obj (:object task-data) -user-context (:user-context task-data) -sampler? (sampler) -execute-sampler? (execute-sampler) -now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] -(when sampler? - (.setProcessSampleStartTime tuple now)) -(when execute-sampler? - (.setExecuteSampleStartTime tuple now)) -(.execute bolt-obj tuple) -(let [delta (tuple-execute-time-delta! tuple)] - (when (= true (storm-conf TOPOLOGY-DEBUG)) -(log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta)) - - (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) - (when delta -(builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data) - executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta) -(stats/bolt-execute-tuple! executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta))) + (let [tuples (if (instance? TupleImpl tupleOrBatch) (list tupleOrBatch) (.buffer tupleOrBatch))] +(doseq [tuple tuples] + (let [stream-id (.getSourceStreamId tuple)] --- End diff -- This line, for instance, is eating 9% of the CPU time. https://cloud.githubusercontent.com/assets/1819836/9452988/143be13e-4a7d-11e5-812f-6c2eefd2b96e.png";> > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710024#comment-14710024 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134373051 Keep an eye out for more missing types. Those are often a source of unexpectedly bad performance. I'm hesitant to add another thread, especially since we're operating on the critical path. We need to figure out some other way to flush. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710018#comment-14710018 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134371947 Type hinting improves by 10% :) But 30% is still a huge gap. About batches that don't fill up. We need to introduce a timeout (and a flushing thread or other flushing strategy). Otherwise, a batch might starve. I just wanted to get the basic design right before taking care of this problem. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710005#comment-14710005 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r37800211 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -445,6 +445,32 @@ ret )) +(defn init-batching-buffer [worker-context component-id] + (let [batchSize (.get_batch_size (.getComponentCommon worker-context component-id))] +(if (> batchSize 1) + (let [consumer-ids (flatten (for [cids (vals (.getTargets worker-context component-id))] (keys cids))) +consumer-task-ids (flatten (for [cid consumer-ids :when (not (.startsWith cid "__"))] (into '() (.getComponentTasks worker-context cid] +(HashMap. (zipmap consumer-task-ids (repeatedly (count consumer-task-ids) #(Batch. batchSize) + (HashMap.) + ))) + +(defn emit-msg [out-task out-tuple overflow-buffer output-batch-buffer transfer-fn] + (let [out-batch (.get output-batch-buffer out-task)] +(if out-batch --- End diff -- Your observation is right for the current state of the code. However, I would like to extend it with the possibility to used different batch sizes for different output streams (including mixed-mode batching/non-batching). Nevertheless, we could have 3 functions: non-batching, batching, mixed-mode. Thus, we could choose the correct function at setup time. The branching overhead is avoided for the both main cases (most times there is only a single output stream). > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709805#comment-14709805 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r37785236 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -445,6 +445,32 @@ ret )) +(defn init-batching-buffer [worker-context component-id] + (let [batchSize (.get_batch_size (.getComponentCommon worker-context component-id))] +(if (> batchSize 1) + (let [consumer-ids (flatten (for [cids (vals (.getTargets worker-context component-id))] (keys cids))) +consumer-task-ids (flatten (for [cid consumer-ids :when (not (.startsWith cid "__"))] (into '() (.getComponentTasks worker-context cid] +(HashMap. (zipmap consumer-task-ids (repeatedly (count consumer-task-ids) #(Batch. batchSize) + (HashMap.) + ))) + +(defn emit-msg [out-task out-tuple overflow-buffer output-batch-buffer transfer-fn] + (let [out-batch (.get output-batch-buffer out-task)] +(if out-batch --- End diff -- Unless I misunderstand, this is going to end up being always true or always false, depending on the result of `.get_batch_size` for the component at startup. In that case, can we decide this when we're setting up the transfer function instead of each time we're emitting a tuple? > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709796#comment-14709796 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134326789 What happens when Batch.capacity - 1 tuples are emitted and then there's a long pause in the input? > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709738#comment-14709738 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r37780743 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -445,6 +445,32 @@ ret )) +(defn init-batching-buffer [worker-context component-id] + (let [batchSize (.get_batch_size (.getComponentCommon worker-context component-id))] +(if (> batchSize 1) + (let [consumer-ids (flatten (for [cids (vals (.getTargets worker-context component-id))] (keys cids))) +consumer-task-ids (flatten (for [cid consumer-ids :when (not (.startsWith cid "__"))] (into '() (.getComponentTasks worker-context cid] +(HashMap. (zipmap consumer-task-ids (repeatedly (count consumer-task-ids) #(Batch. batchSize) + (HashMap.) + ))) + +(defn emit-msg [out-task out-tuple overflow-buffer output-batch-buffer transfer-fn] --- End diff -- Try type-hinting `output-batch-buffer` here. I haven't profiled it myself, but what you're saying as far as performance goes sounds like it could be the result of reflection. Type-hinting anything you're calling methods on should help. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709740#comment-14709740 ] ASF GitHub Bot commented on STORM-855: -- Github user knusbaum commented on a diff in the pull request: https://github.com/apache/storm/pull/694#discussion_r37780767 --- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj --- @@ -445,6 +445,32 @@ ret )) +(defn init-batching-buffer [worker-context component-id] + (let [batchSize (.get_batch_size (.getComponentCommon worker-context component-id))] +(if (> batchSize 1) + (let [consumer-ids (flatten (for [cids (vals (.getTargets worker-context component-id))] (keys cids))) +consumer-task-ids (flatten (for [cid consumer-ids :when (not (.startsWith cid "__"))] (into '() (.getComponentTasks worker-context cid] +(HashMap. (zipmap consumer-task-ids (repeatedly (count consumer-task-ids) #(Batch. batchSize) + (HashMap.) + ))) + +(defn emit-msg [out-task out-tuple overflow-buffer output-batch-buffer transfer-fn] + (let [out-batch (.get output-batch-buffer out-task)] --- End diff -- Type-hinting out-batch should also help. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709725#comment-14709725 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134315867 Hi, I did a few performance tests and unfortunately, the impact of my changes is quite high. Not using batching in my branch (compared to master) reduces throughput by 40%. :( I dug into the problem and it seems that there are two critical point in the code. First, I introduced function `emit-msg` (in `executor.clj`). This function uses a Java `HashMap` (`output-batch-buffer`) and it branches. Both have a large negative impact. Can it be that the access via `HashMap-get()` is slow in Clojure? What about branching? To me it seems that there performance impact is ridiculously high (even if I put into account that this code is called a few 100,000 times a second). Especially branch prediction should avoid the branching overhead at all. Because batching was disabled in the test, the else branch is taken every time... Furthermore, I changed the serialization by overloading `KryoTuple(Batch)Serializer.serialize(...)` (I renamed this class). It seems that this overloading makes the call to `.serialize(Tuple)` much more expensive. I was thinking about changing the code, such that even if batching is disabled, I just use batches of size 1 to eliminate the overload. Of course, if comes with the drawback, that a single tuple consumers more space as I need to write the batch size in front of each batch. Does this observation make sense? Or do you think I oversee something? > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705897#comment-14705897 ] ASF GitHub Bot commented on STORM-855: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133200328 Sure, please take your time! > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705889#comment-14705889 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133198490 Thanks for your guidance! I will have a look at #521 and add some results. May take a few days... > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705886#comment-14705886 ] ASF GitHub Bot commented on STORM-855: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133197501 No, there's no specific approach. You can refer @d2r's approach, https://github.com/apache/storm/pull/521#issuecomment-106038982 I think we would be interesting to compare these - not applying this patch, applying this patch & disable batch, applying this patch & some kind of batch sizes. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705873#comment-14705873 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133194835 Sure. Is there any specific approach I should take? How to add/report those result? I did a simple test already, using `ExclamationTopology` example. Setting batch size to 100 in Spout roughly doubled throughput and increased latency from 1 to 3 seconds. But this is of course not representative. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705871#comment-14705871 ] ASF GitHub Bot commented on STORM-855: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133194709 FYI, you can use https://github.com/yahoo/storm-perf-test for benchmarking if you don't have your own. You need to modify pom.xml to let it points recent version of Storm and build. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705861#comment-14705861 ] ASF GitHub Bot commented on STORM-855: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133193429 @mjsax I'm not familiar with clojure, too. Many committers will take a look, so don't worry. :) Before taking a look, I think you're encouraged to do benchmark and attach results, since it modifies critical path, especially latency vs throughput. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705827#comment-14705827 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133187814 Great! Just a heads up: This is my first PR for Storm and I just started to learn Clojure. Please review very carefully. Looking forward to your feedback. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705821#comment-14705821 ] ASF GitHub Bot commented on STORM-855: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133187367 asfbot recognizes your change and links. :) > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705818#comment-14705818 ] ASF GitHub Bot commented on STORM-855: -- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-133186983 I guess I need to close an re-open this PR to get the linkage to JIRA... > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-855) Add tuple batching
[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14587123#comment-14587123 ] Matthias J. Sax commented on STORM-855: --- I started to work on this. You can find the current progress at: https://github.com/mjsax/storm/tree/batching It will take some time. Need to get started with Clojure first :) I will open a pull request if the code is more mature. Feedback is welcome any time. > Add tuple batching > -- > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)