Github user mjsax commented on the pull request:

    https://github.com/apache/storm/pull/694#issuecomment-144203098
  
    I just pushed a new version (rebased to current master and squahed commits 
etc). The PR is in much cleaner state now. Looking forward to your feedback.
    
    Batching does now work with acks and each output stream can have a 
different batch size. The ack streams (`__ack_init`, `__ack_ack`, and 
`__ack_fail`) are treated as regular streams from a batching point of view. 
Metric and Eventlogger should work too (but I did not test is).
    
    If you want to test this, be aware that hybrid batching is **not 
supported** yet (for performance reasons -- at least for the non-acking case). 
Thus, right now you set batch size via and `int` only if acking is disabled or 
via `HashMap` and the map must contain an entry for each output stream 
(including ack streams) and the batch size must be at least 1. (After we 
decided how to proceed -- see next paragraph --, this can be cleaned up.)
    
    The problem with hybrid batching is the serialization path in 
*executor.clj* at `mk-transfer-fn` and `start-batch-transfer->worker-handler!`. 
I wrote the code for hybrid serialization already but disabled it, ie, put it 
into comments. Because, I am not able to set different serializers for 
different output stream, only a hybrid serializer could be used. However, the 
runtime binding to the correct method for TupleImpl or Batch reduced the 
throughput (see numbers below). Not sure if/how this could be resolved. On the 
other hand, a batch size of one does not have a big performance penalty -- 
maybe it would be worth to enable batching all the time (ie, even for the 
non-batching case, just use a batch size of one) to avoid the hybrid setup.
    
    I also did some network monitoring using `nmon`. It shows that batching 
does reduce the actually transfered number of byte over the network. The perf 
tool does not measure but compute the number of transfered bytes (what is not 
quite accurate). Right now, I don't have the numbers, but if you wish I could 
rerun those experiments and post here.
    
    I collected the following number for non-acking and acking:
    
    **NO ACKING**
    
    `--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 
--testTimeSec 40`
    
    no batching:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       11      0       1443539268669   0       
0       0.0
    WAITING     1       96      11      11      11      1443539298669   30000   
6688960 21.263631184895832
    WAITING     1       96      11      11      11      1443539328669   30000   
7518460 23.900540669759113
    RUNNING     1       96      11      11      11      1443539358669   30000   
10428980        33.15283457438151
    RUNNING     1       96      11      11      11      1443539388669   30000   
10395200        33.045450846354164
    ```
    
    batch size = 1 (to measure overhead; about 5%):
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       11      0       1443539471193   0       
0       0.0
    WAITING     1       96      11      11      11      1443539501193   30000   
3089120 9.820048014322916
    WAITING     1       96      11      11      11      1443539531193   30000   
9134740 29.038556416829426
    RUNNING     1       96      11      11      11      1443539561193   30000   
9502680 30.208206176757812
    RUNNING     1       96      11      11      11      1443539591193   30000   
9672300 30.747413635253906
    ```
    
    batch size = 100 (throughput improvement by about 85%)
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       11      0       1443539658994   0       
0       0.0
    WAITING     1       96      11      11      11      1443539688994   30000   
8345560 26.529820760091145
    WAITING     1       96      11      11      11      1443539718994   30000   
19876460        63.18556467692057
    RUNNING     1       96      11      11      11      1443539748994   30000   
18229880        57.95122782389323
    RUNNING     1       96      11      11      11      1443539778994   30000   
18294660        58.15715789794922
    ```
    
    **ACKING**
    
    `--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 
--testTimeSec 40 --ack --ackers 10`
    
    no batching:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       21      0       1443539868024   0       
0       0.0
    WAITING     1       96      21      21      21      1443539898024   30000   
864800  2.7491251627604165
    WAITING     1       96      21      21      21      1443539928024   30000   
1768760 5.6227366129557295
    RUNNING     1       96      21      21      21      1443539958024   30000   
1910340 6.072807312011719
    RUNNING     1       96      21      21      21      1443539988025   30001   
1888740 6.003942629809475
    ```
    Complete Latency (from WebUI): 6.256
    
    all batch sizes = 1 (to measure overhead; acking dominates; no overhead 
measurable):
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      2       21      0       1443540043224   0       
0       0.0
    WAITING     1       96      21      21      21      1443540073225   30001   
803060  2.552773895980811
    WAITING     1       96      21      21      21      1443540103225   30000   
2001520 6.362660725911458
    RUNNING     1       96      21      21      21      1443540133224   29999   
1789860 5.690001373255411
    RUNNING     1       96      21      21      21      1443540163224   30000   
1925420 6.120745340983073
    ```
    Complete Latency (from WebUI): 9.686 (no impact -- 3ms difference is just 
too small to be a reliable number)
    
    default batch size = 100 (almost no throughput improvement; about 15%; 
acking dominates as acks are not batched)
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       21      0       1443540446423   0       
0       0.0
    WAITING     1       96      21      21      21      1443540476423   30000   
866980  2.7560551961263022
    WAITING     1       96      21      21      21      1443540506423   30000   
2076100 6.599744160970052
    RUNNING     1       96      21      21      21      1443540536424   30001   
2100200 6.676133459939356
    RUNNING     1       96      21      21      21      1443540566424   30000   
2191620 6.966972351074219
    ```
    Complete Latency (from WebUI): 11.721 (compared to 6ms, almost doubles, but 
this is still tiny)
    
    all batch sizes = 100 (acks are batched too; throughput improvement 
increases to more than 50%)
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       21      0       1443540631845   0       
0       0.0
    WAITING     1       96      21      21      21      1443540661846   30001   
1728100 5.493298843977336
    WAITING     1       96      21      21      21      1443540691845   29999   
2814100 8.946081182035496
    RUNNING     1       96      21      21      21      1443540721845   30000   
2084440 6.6262563069661455
    RUNNING     1       96      21      21      21      1443540751845   30000   
2949700 9.376843770345053
    ```
    Complete Latency (from WebUI): 65.225 (latency increased by factor of 10 
due to ack batching)
    
    **===== Hybrid =====**
    I repeated the same experiment using hybrid serializer 
(`KryoTupleBatchSerializer`): It is a similar, however with reduced throughput 
by 25% for the non-acking case. In the acking case, the hybrid approach has no 
influence as acking dominates.
    
    **NO ACKING (hybrid)**
    
    `--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 
--testTimeSec 40`
    
    no batching:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       11      0       1443536488016   0       
0       0.0
    WAITING     1       96      11      11      11      1443536518016   30000   
5118760 16.27209981282552
    WAITING     1       96      11      11      11      1443536548017   30001   
8131680 25.84905291568406
    RUNNING     1       96      11      11      11      1443536578016   29999   
7960300 25.305955734820067
    RUNNING     1       96      11      11      11      1443536608016   30000   
7711800 24.515151977539062
    ```
    
    batch size = 1 (to measure overhead):
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       11      0       1443536668726   0       
0       0.0
    WAITING     1       96      11      11      11      1443536698726   30000   
4799100 15.255928039550781
    WAITING     1       96      11      11      11      1443536728726   30000   
7695580 24.463589986165363
    RUNNING     1       96      11      11      11      1443536758727   30001   
7828800 24.886255419090197
    RUNNING     1       96      11      11      11      1443536788726   29999   
5613780 17.84632089054661
    ```
    
    batch size = 100:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       11      0       1443536914666   0       
0       0.0
    WAITING     1       96      11      11      11      1443536944666   30000   
9114920 28.975550333658855
    WAITING     1       96      11      11      11      1443536974666   30000   
16459180        52.32232411702474
    RUNNING     1       96      11      11      11      1443537004666   30000   
16347100        51.96603139241537
    RUNNING     1       96      11      11      11      1443537034666   30000   
16594400        52.752176920572914
    ```
    
    **ACKING  (hybrid)**
    
    `--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10 
--testTimeSec 40 --ack --ackers 10`
    
    no batching:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       21      0       1443538020486   0       
0       0.0
    WAITING     1       96      21      21      21      1443538050486   30000   
934640  2.9711405436197915
    WAITING     1       96      21      21      21      1443538080486   30000   
1774140 5.639839172363281
    RUNNING     1       96      21      21      21      1443538110486   30000   
1875420 5.961799621582031
    RUNNING     1       96      21      21      21      1443538140486   30000   
1832160 5.82427978515625
    ```
    Complete Latency (from WebUI): 43.945
    
    all batch sizes = 1 (to measure overhead):
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      3       21      0       1443538188807   0       
0       0.0
    WAITING     1       96      21      21      21      1443538218807   30000   
1192380 3.7904739379882812
    WAITING     1       96      21      21      21      1443538248807   30000   
1915040 6.087748209635417
    RUNNING     1       96      21      21      21      1443538278808   30001   
1387960 4.412058945365883
    RUNNING     1       96      21      21      21      1443538308807   29999   
2106580 6.696860700206934
    ```
    Complete Latency (from WebUI): 9.132
    
    default batch size = 100:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      5       21      0       1443538350612   0       
0       0.0
    WAITING     1       96      21      21      21      1443538380612   30000   
1449080 4.6065012613932295
    WAITING     1       96      21      21      21      1443538410613   30001   
2369020 7.5306607414843985
    RUNNING     1       96      21      21      21      1443538440612   29999   
2323580 7.386708117321359
    RUNNING     1       96      21      21      21      1443538470612   30000   
2414040 7.6740264892578125
    
    ```
    Complete Latency (from WebUI): 11.063
    
    all batch sizes = 100:
    ```
    status      topologies      totalSlots      slotsUsed       totalExecutors  
executorsWithMetrics    time    time-diff ms    transferred     throughput 
(MB/s)
    WAITING     1       96      0       21      0       1443538562562   0       
0       0.0
    WAITING     1       96      21      21      21      1443538592562   30000   
1278060 4.062843322753906
    WAITING     1       96      21      21      21      1443538622562   30000   
2170020 6.898307800292969
    RUNNING     1       96      21      21      21      1443538652563   30001   
2209060 7.022178545383122
    RUNNING     1       96      21      21      21      1443538682562   29999   
2155180 6.851361089477723
    ```
    Complete Latency (from WebUI): 122.958


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to