[ https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710451#comment-14710451 ]
ASF GitHub Bot commented on STORM-855: -------------------------------------- Github user mjsax commented on the pull request: https://github.com/apache/storm/pull/694#issuecomment-134450197 I just pushed some changes (I added a new commit, so you can better see what I changed): - added type hints - split tuple and batch serialization in separate classes - assemble different "emit function" in Clojure for single tuple and batch case (to add more type hints) I get the following result running on a 4 node cluster with parameters: `storm jar storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --bolt 3 --name test -l 1 -n 1 --messageSize 4 --workers 4 --spout 1 --testTimeSec 300` Master Branch: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetrics time time-diff ms transferred throughput (MB/s) WAITING 1 48 0 4 0 1440466170638 0 0 0.0 WAITING 1 48 4 4 4 1440466200638 30000 6122840 0.7785593668619791 WAITING 1 48 4 4 4 1440466230638 30000 11565400 1.4706166585286458 RUNNING 1 48 4 4 4 1440466260638 30000 11394040 1.4488271077473958 RUNNING 1 48 4 4 4 1440466290638 30000 11718240 1.49005126953125 RUNNING 1 48 4 4 4 1440466320638 30000 11615920 1.4770406087239583 RUNNING 1 48 4 4 4 1440466350638 30000 11557380 1.4695968627929688 RUNNING 1 48 4 4 4 1440466380638 30000 11581080 1.4726104736328125 RUNNING 1 48 4 4 4 1440466410638 30000 11492600 1.4613596598307292 RUNNING 1 48 4 4 4 1440466440638 30000 11413760 1.4513346354166667 RUNNING 1 48 4 4 4 1440466470638 30000 11300580 1.4369430541992188 RUNNING 1 48 4 4 4 1440466500638 30000 11368760 1.4456125895182292 RUNNING 1 48 4 4 4 1440466530638 30000 11509820 1.463549296061198 ``` Batching branch with batching disabled: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetrics time time-diff ms transferred throughput (MB/s) WAITING 1 48 0 4 0 1440467016767 0 0 0.0 WAITING 1 48 4 4 4 1440467046767 30000 7095940 0.9022954305013021 WAITING 1 48 4 4 4 1440467076767 30000 11136640 1.4160970052083333 RUNNING 1 48 4 4 4 1440467106767 30000 11159220 1.4189682006835938 RUNNING 1 48 4 4 4 1440467136767 30000 7757660 0.9864374796549479 RUNNING 1 48 4 4 4 1440467166767 30000 11375580 1.4464797973632812 RUNNING 1 48 4 4 4 1440467196767 30000 11669980 1.4839146931966145 RUNNING 1 48 4 4 4 1440467226767 30000 11344380 1.4425125122070312 RUNNING 1 48 4 4 4 1440467256767 30000 11521460 1.4650293986002605 RUNNING 1 48 4 4 4 1440467286767 30000 11401040 1.4497172037760417 RUNNING 1 48 4 4 4 1440467316767 30000 11493700 1.461499532063802 RUNNING 1 48 4 4 4 1440467346767 30000 11452680 1.4562835693359375 RUNNING 1 48 4 4 4 1440467376767 30000 11148300 1.4175796508789062 ``` Batching branch with batch size of 100 tuples: ``` status topologies totalSlots slotsUsed totalExecutors executorsWithMetrics time time-diff ms transferred throughput (MB/s) WAITING 1 48 1 4 0 1440467461710 0 0 0.0 WAITING 1 48 4 4 4 1440467491710 30000 11686000 1.4859517415364583 WAITING 1 48 4 4 4 1440467521710 30000 18026640 2.292205810546875 RUNNING 1 48 4 4 4 1440467551710 30000 17936300 2.2807184855143228 RUNNING 1 48 4 4 4 1440467581710 30000 18969300 2.4120712280273438 RUNNING 1 48 4 4 4 1440467611710 30000 18581620 2.3627751668294272 RUNNING 1 48 4 4 4 1440467641711 30001 18963120 2.4112050268897285 RUNNING 1 48 4 4 4 1440467671710 29999 18607200 2.3661067022546587 RUNNING 1 48 4 4 4 1440467701710 30000 19333620 2.4583969116210938 RUNNING 1 48 4 4 4 1440467731710 30000 18629100 2.3688125610351562 RUNNING 1 48 4 4 4 1440467761711 30001 18847820 2.3965443624209923 RUNNING 1 48 4 4 4 1440467791710 29999 18021400 2.291615897287722 RUNNING 1 48 4 4 4 1440467821710 30000 18143360 2.3070475260416665 ``` The negative impact is gone and batching increases output rate by about 50%. Need to do more tests. Also need to investigate the performance impact of input debachting. Furthermore, need to test with acking enabled. Some more question: - What about `assert-can-serialize`? Is it performance critical? Did not test it, but it seems that a generic approach for tuple and batch should be good enough. - I also do not understand the following code (line 648-622 in `executor.clj` in batching branch). I used batching, but did not modify this code. Nevertheless it works (I guess this part is not executed?). Can you explain? ``` (task/send-unanchored task-data ACKER-INIT-STREAM-ID [root-id (bit-xor-vals out-ids) task-id] overflow-buffer) ``` - What about batching `acks`? Would it make sense? I don't understand the acking code path good enough right now to judge. As acking is quite expensive, it might be a good idea. > Add tuple batching > ------------------ > > Key: STORM-855 > URL: https://issues.apache.org/jira/browse/STORM-855 > Project: Apache Storm > Issue Type: New Feature > Reporter: Matthias J. Sax > Assignee: Matthias J. Sax > Priority: Minor > > In order to increase Storm's throughput, multiple tuples can be grouped > together in a batch of tuples (ie, fat-tuple) and transfered from producer to > consumer at once. > The initial idea is taken from https://github.com/mjsax/aeolus. However, we > aim to integrate this feature deep into the system (in contrast to building > it on top), what has multiple advantages: > - batching can be even more transparent to the user (eg, no extra > direct-streams needed to mimic Storm's data distribution patterns) > - fault-tolerance (anchoring/acking) can be done on a tuple granularity > (not on a batch granularity, what leads to much more replayed tuples -- and > result duplicates -- in case of failure) > The aim is to extend TopologyBuilder interface with an additional parameter > 'batch_size' to expose this feature to the user. Per default, batching will > be disabled. > This batching feature has pure tuple transport purpose, ie, tuple-by-tuple > processing semantics are preserved. An output batch is assembled at the > producer and completely disassembled at the consumer. The consumer output can > be batched again, however, independent of batched or non-batched input. Thus, > batches can be of different size for each producer-consumer pair. > Furthermore, consumers can receive batches of different size from different > producers (including regular non batched input). -- This message was sent by Atlassian JIRA (v6.3.4#6332)