[ 
https://issues.apache.org/jira/browse/STORM-855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710451#comment-14710451
 ] 

ASF GitHub Bot commented on STORM-855:
--------------------------------------

Github user mjsax commented on the pull request:

    https://github.com/apache/storm/pull/694#issuecomment-134450197
  
    I just pushed some changes (I added a new commit, so you can better see 
what I changed):
     - added type hints
     - split tuple and batch serialization in separate classes
     - assemble different "emit function" in Clojure for single tuple and batch 
case (to add more type hints)
    
    I get the following result running on a 4 node cluster with parameters: 
`storm jar storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar 
com.yahoo.storm.perftest.Main --bolt 3 --name test -l 1 -n 1 --messageSize 4 
--workers 4 --spout 1 --testTimeSec 300`
    
    Master Branch:
    ```
    status  topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING 1       48      0       4       0       1440466170638   0       0   
    0.0
    WAITING 1       48      4       4       4       1440466200638   30000   
6122840 0.7785593668619791
    WAITING 1       48      4       4       4       1440466230638   30000   
11565400        1.4706166585286458
    RUNNING 1       48      4       4       4       1440466260638   30000   
11394040        1.4488271077473958
    RUNNING 1       48      4       4       4       1440466290638   30000   
11718240        1.49005126953125
    RUNNING 1       48      4       4       4       1440466320638   30000   
11615920        1.4770406087239583
    RUNNING 1       48      4       4       4       1440466350638   30000   
11557380        1.4695968627929688
    RUNNING 1       48      4       4       4       1440466380638   30000   
11581080        1.4726104736328125
    RUNNING 1       48      4       4       4       1440466410638   30000   
11492600        1.4613596598307292
    RUNNING 1       48      4       4       4       1440466440638   30000   
11413760        1.4513346354166667
    RUNNING 1       48      4       4       4       1440466470638   30000   
11300580        1.4369430541992188
    RUNNING 1       48      4       4       4       1440466500638   30000   
11368760        1.4456125895182292
    RUNNING 1       48      4       4       4       1440466530638   30000   
11509820        1.463549296061198
    ```
    
    Batching branch with batching disabled:
    ```
    status  topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING 1       48      0       4       0       1440467016767   0       0   
    0.0
    WAITING 1       48      4       4       4       1440467046767   30000   
7095940 0.9022954305013021
    WAITING 1       48      4       4       4       1440467076767   30000   
11136640        1.4160970052083333
    RUNNING 1       48      4       4       4       1440467106767   30000   
11159220        1.4189682006835938
    RUNNING 1       48      4       4       4       1440467136767   30000   
7757660 0.9864374796549479
    RUNNING 1       48      4       4       4       1440467166767   30000   
11375580        1.4464797973632812
    RUNNING 1       48      4       4       4       1440467196767   30000   
11669980        1.4839146931966145
    RUNNING 1       48      4       4       4       1440467226767   30000   
11344380        1.4425125122070312
    RUNNING 1       48      4       4       4       1440467256767   30000   
11521460        1.4650293986002605
    RUNNING 1       48      4       4       4       1440467286767   30000   
11401040        1.4497172037760417
    RUNNING 1       48      4       4       4       1440467316767   30000   
11493700        1.461499532063802
    RUNNING 1       48      4       4       4       1440467346767   30000   
11452680        1.4562835693359375
    RUNNING 1       48      4       4       4       1440467376767   30000   
11148300        1.4175796508789062
    ```
    
    Batching branch with batch size of 100 tuples:
    ```
    status  topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING 1       48      1       4       0       1440467461710   0       0   
    0.0
    WAITING 1       48      4       4       4       1440467491710   30000   
11686000        1.4859517415364583
    WAITING 1       48      4       4       4       1440467521710   30000   
18026640        2.292205810546875
    RUNNING 1       48      4       4       4       1440467551710   30000   
17936300        2.2807184855143228
    RUNNING 1       48      4       4       4       1440467581710   30000   
18969300        2.4120712280273438
    RUNNING 1       48      4       4       4       1440467611710   30000   
18581620        2.3627751668294272
    RUNNING 1       48      4       4       4       1440467641711   30001   
18963120        2.4112050268897285
    RUNNING 1       48      4       4       4       1440467671710   29999   
18607200        2.3661067022546587
    RUNNING 1       48      4       4       4       1440467701710   30000   
19333620        2.4583969116210938
    RUNNING 1       48      4       4       4       1440467731710   30000   
18629100        2.3688125610351562
    RUNNING 1       48      4       4       4       1440467761711   30001   
18847820        2.3965443624209923
    RUNNING 1       48      4       4       4       1440467791710   29999   
18021400        2.291615897287722
    RUNNING 1       48      4       4       4       1440467821710   30000   
18143360        2.3070475260416665
    ```
    
    The negative impact is gone and batching increases output rate by about 
50%. Need to do more tests. Also need to investigate the performance impact of 
input debachting. Furthermore, need to test with acking enabled.
    
    Some more question:
     - What about `assert-can-serialize`? Is it performance critical? Did not 
test it, but it seems that a generic approach for tuple and batch should be 
good enough.
     - I also do not understand the following code (line 648-622 in 
`executor.clj` in batching branch). I used batching, but did not modify this 
code. Nevertheless it works (I guess this part is not executed?). Can you 
explain?
    ```
    (task/send-unanchored task-data
                                          ACKER-INIT-STREAM-ID
                                          [root-id (bit-xor-vals out-ids) 
task-id]
                                          overflow-buffer)
    ```
     - What about batching `acks`? Would it make sense? I don't understand the 
acking code path good enough right now to judge. As acking is quite expensive, 
it might be a good idea.


> Add tuple batching
> ------------------
>
>                 Key: STORM-855
>                 URL: https://issues.apache.org/jira/browse/STORM-855
>             Project: Apache Storm
>          Issue Type: New Feature
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Minor
>
> In order to increase Storm's throughput, multiple tuples can be grouped 
> together in a batch of tuples (ie, fat-tuple) and transfered from producer to 
> consumer at once.
> The initial idea is taken from https://github.com/mjsax/aeolus. However, we 
> aim to integrate this feature deep into the system (in contrast to building 
> it on top), what has multiple advantages:
>   - batching can be even more transparent to the user (eg, no extra 
> direct-streams needed to mimic Storm's data distribution patterns)
>   - fault-tolerance (anchoring/acking) can be done on a tuple granularity 
> (not on a batch granularity, what leads to much more replayed tuples -- and 
> result duplicates -- in case of failure)
> The aim is to extend TopologyBuilder interface with an additional parameter 
> 'batch_size' to expose this feature to the user. Per default, batching will 
> be disabled.
> This batching feature has pure tuple transport purpose, ie, tuple-by-tuple 
> processing semantics are preserved. An output batch is assembled at the 
> producer and completely disassembled at the consumer. The consumer output can 
> be batched again, however, independent of batched or non-batched input. Thus, 
> batches can be of different size for each producer-consumer pair. 
> Furthermore, consumers can receive batches of different size from different 
> producers (including regular non batched input).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to