Github user mjsax commented on the pull request:
https://github.com/apache/storm/pull/694#issuecomment-144203098
I just pushed a new version (rebased to current master and squahed commits
etc). The PR is in much cleaner state now. Looking forward to your feedback.
Batching does now work with acks and each output stream can have a
different batch size. The ack streams (`__ack_init`, `__ack_ack`, and
`__ack_fail`) are treated as regular streams from a batching point of view.
Metric and Eventlogger should work too (but I did not test is).
If you want to test this, be aware that hybrid batching is **not
supported** yet (for performance reasons -- at least for the non-acking case).
Thus, right now you set batch size via and `int` only if acking is disabled or
via `HashMap` and the map must contain an entry for each output stream
(including ack streams) and the batch size must be at least 1. (After we
decided how to proceed -- see next paragraph --, this can be cleaned up.)
The problem with hybrid batching is the serialization path in
*executor.clj* at `mk-transfer-fn` and `start-batch-transfer->worker-handler!`.
I wrote the code for hybrid serialization already but disabled it, ie, put it
into comments. Because, I am not able to set different serializers for
different output stream, only a hybrid serializer could be used. However, the
runtime binding to the correct method for TupleImpl or Batch reduced the
throughput (see numbers below). Not sure if/how this could be resolved. On the
other hand, a batch size of one does not have a big performance penalty --
maybe it would be worth to enable batching all the time (ie, even for the
non-batching case, just use a batch size of one) to avoid the hybrid setup.
I also did some network monitoring using `nmon`. It shows that batching
does reduce the actually transfered number of byte over the network. The perf
tool does not measure but compute the number of transfered bytes (what is not
quite accurate). Right now, I don't have the numbers, but if you wish I could
rerun those experiments and post here.
I collected the following number for non-acking and acking:
**NO ACKING**
`--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10
--testTimeSec 40`
no batching:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 11 0 1443539268669 0
0 0.0
WAITING 1 96 11 11 11 1443539298669 30000
6688960 21.263631184895832
WAITING 1 96 11 11 11 1443539328669 30000
7518460 23.900540669759113
RUNNING 1 96 11 11 11 1443539358669 30000
10428980 33.15283457438151
RUNNING 1 96 11 11 11 1443539388669 30000
10395200 33.045450846354164
```
batch size = 1 (to measure overhead; about 5%):
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 11 0 1443539471193 0
0 0.0
WAITING 1 96 11 11 11 1443539501193 30000
3089120 9.820048014322916
WAITING 1 96 11 11 11 1443539531193 30000
9134740 29.038556416829426
RUNNING 1 96 11 11 11 1443539561193 30000
9502680 30.208206176757812
RUNNING 1 96 11 11 11 1443539591193 30000
9672300 30.747413635253906
```
batch size = 100 (throughput improvement by about 85%)
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 11 0 1443539658994 0
0 0.0
WAITING 1 96 11 11 11 1443539688994 30000
8345560 26.529820760091145
WAITING 1 96 11 11 11 1443539718994 30000
19876460 63.18556467692057
RUNNING 1 96 11 11 11 1443539748994 30000
18229880 57.95122782389323
RUNNING 1 96 11 11 11 1443539778994 30000
18294660 58.15715789794922
```
**ACKING**
`--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10
--testTimeSec 40 --ack --ackers 10`
no batching:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 21 0 1443539868024 0
0 0.0
WAITING 1 96 21 21 21 1443539898024 30000
864800 2.7491251627604165
WAITING 1 96 21 21 21 1443539928024 30000
1768760 5.6227366129557295
RUNNING 1 96 21 21 21 1443539958024 30000
1910340 6.072807312011719
RUNNING 1 96 21 21 21 1443539988025 30001
1888740 6.003942629809475
```
Complete Latency (from WebUI): 6.256
all batch sizes = 1 (to measure overhead; acking dominates; no overhead
measurable):
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 2 21 0 1443540043224 0
0 0.0
WAITING 1 96 21 21 21 1443540073225 30001
803060 2.552773895980811
WAITING 1 96 21 21 21 1443540103225 30000
2001520 6.362660725911458
RUNNING 1 96 21 21 21 1443540133224 29999
1789860 5.690001373255411
RUNNING 1 96 21 21 21 1443540163224 30000
1925420 6.120745340983073
```
Complete Latency (from WebUI): 9.686 (no impact -- 3ms difference is just
too small to be a reliable number)
default batch size = 100 (almost no throughput improvement; about 15%;
acking dominates as acks are not batched)
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 21 0 1443540446423 0
0 0.0
WAITING 1 96 21 21 21 1443540476423 30000
866980 2.7560551961263022
WAITING 1 96 21 21 21 1443540506423 30000
2076100 6.599744160970052
RUNNING 1 96 21 21 21 1443540536424 30001
2100200 6.676133459939356
RUNNING 1 96 21 21 21 1443540566424 30000
2191620 6.966972351074219
```
Complete Latency (from WebUI): 11.721 (compared to 6ms, almost doubles, but
this is still tiny)
all batch sizes = 100 (acks are batched too; throughput improvement
increases to more than 50%)
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 21 0 1443540631845 0
0 0.0
WAITING 1 96 21 21 21 1443540661846 30001
1728100 5.493298843977336
WAITING 1 96 21 21 21 1443540691845 29999
2814100 8.946081182035496
RUNNING 1 96 21 21 21 1443540721845 30000
2084440 6.6262563069661455
RUNNING 1 96 21 21 21 1443540751845 30000
2949700 9.376843770345053
```
Complete Latency (from WebUI): 65.225 (latency increased by factor of 10
due to ack batching)
**===== Hybrid =====**
I repeated the same experiment using hybrid serializer
(`KryoTupleBatchSerializer`): It is a similar, however with reduced throughput
by 25% for the non-acking case. In the acking case, the hybrid approach has no
influence as acking dominates.
**NO ACKING (hybrid)**
`--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10
--testTimeSec 40`
no batching:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 11 0 1443536488016 0
0 0.0
WAITING 1 96 11 11 11 1443536518016 30000
5118760 16.27209981282552
WAITING 1 96 11 11 11 1443536548017 30001
8131680 25.84905291568406
RUNNING 1 96 11 11 11 1443536578016 29999
7960300 25.305955734820067
RUNNING 1 96 11 11 11 1443536608016 30000
7711800 24.515151977539062
```
batch size = 1 (to measure overhead):
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 11 0 1443536668726 0
0 0.0
WAITING 1 96 11 11 11 1443536698726 30000
4799100 15.255928039550781
WAITING 1 96 11 11 11 1443536728726 30000
7695580 24.463589986165363
RUNNING 1 96 11 11 11 1443536758727 30001
7828800 24.886255419090197
RUNNING 1 96 11 11 11 1443536788726 29999
5613780 17.84632089054661
```
batch size = 100:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 11 0 1443536914666 0
0 0.0
WAITING 1 96 11 11 11 1443536944666 30000
9114920 28.975550333658855
WAITING 1 96 11 11 11 1443536974666 30000
16459180 52.32232411702474
RUNNING 1 96 11 11 11 1443537004666 30000
16347100 51.96603139241537
RUNNING 1 96 11 11 11 1443537034666 30000
16594400 52.752176920572914
```
**ACKING (hybrid)**
`--name test -l 1 -n 1 --messageSize 100 --workers 24 --spout 1 --bolt 10
--testTimeSec 40 --ack --ackers 10`
no batching:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 21 0 1443538020486 0
0 0.0
WAITING 1 96 21 21 21 1443538050486 30000
934640 2.9711405436197915
WAITING 1 96 21 21 21 1443538080486 30000
1774140 5.639839172363281
RUNNING 1 96 21 21 21 1443538110486 30000
1875420 5.961799621582031
RUNNING 1 96 21 21 21 1443538140486 30000
1832160 5.82427978515625
```
Complete Latency (from WebUI): 43.945
all batch sizes = 1 (to measure overhead):
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 3 21 0 1443538188807 0
0 0.0
WAITING 1 96 21 21 21 1443538218807 30000
1192380 3.7904739379882812
WAITING 1 96 21 21 21 1443538248807 30000
1915040 6.087748209635417
RUNNING 1 96 21 21 21 1443538278808 30001
1387960 4.412058945365883
RUNNING 1 96 21 21 21 1443538308807 29999
2106580 6.696860700206934
```
Complete Latency (from WebUI): 9.132
default batch size = 100:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 5 21 0 1443538350612 0
0 0.0
WAITING 1 96 21 21 21 1443538380612 30000
1449080 4.6065012613932295
WAITING 1 96 21 21 21 1443538410613 30001
2369020 7.5306607414843985
RUNNING 1 96 21 21 21 1443538440612 29999
2323580 7.386708117321359
RUNNING 1 96 21 21 21 1443538470612 30000
2414040 7.6740264892578125
```
Complete Latency (from WebUI): 11.063
all batch sizes = 100:
```
status topologies totalSlots slotsUsed totalExecutors
executorsWithMetrics time time-diff ms transferred throughput
(MB/s)
WAITING 1 96 0 21 0 1443538562562 0
0 0.0
WAITING 1 96 21 21 21 1443538592562 30000
1278060 4.062843322753906
WAITING 1 96 21 21 21 1443538622562 30000
2170020 6.898307800292969
RUNNING 1 96 21 21 21 1443538652563 30001
2209060 7.022178545383122
RUNNING 1 96 21 21 21 1443538682562 29999
2155180 6.851361089477723
```
Complete Latency (from WebUI): 122.958
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---