[ https://issues.apache.org/jira/browse/STORM-3314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747458#comment-16747458 ]
Roshan Naik edited comment on STORM-3314 at 1/20/19 1:22 PM: ------------------------------------------------------------- *Summary of the above two approaches:* +First approach+ tries to eliminate ACKer bolt by employing the existing spouts and bolts. It doesnt try to mitigate interworker ACK msgs. Also ACK msgs can get stuck behind regular msgs under BP. +Second approach+ is centered around mitigating interworker ACK msging. Retains need to tweak Acker count separately to handle more/lesser spout and bolts... most likely a fixed number of them in each worker. Both eliminate need for fields grouping of ACKs. Both need more thought on timeout resets. *Some data points:* * *Multiworker Speed:* In Storm 2, two components on +different workers+ can sustain a max throughput of ~*4 mill/sec* over the interworker msg-ing (w/o ACKing). * *ACKer bolt* speed (Single worker mode) : ** An ACKer bolt can sustain a max topo throughput of only ~*1.7 mill/sec* in *single worker* mode... when handling an ultra bare minimal topo of 1 spout and 1 bolt. ** ACKer slows down to *950k/sec* mill/sec with adding one more bolt in the mix (i.e. ConstSpoutIdBoltNullBoltTopo) * *ACKing + Multi worker:* For 2 workers (1 spout & 1 bolt) ACK mode throughput is at ~*900k/s.* Ideally, ACKing path within single worker should be faster than interworker msging, given the absence of de/serialization and network stack. Clearly both the ACKer bolt itself and the interworker msging are big factors... with the ACKer bolt being the more significant one. Mitigating only interworker msging, will likely leave a lot of potential performance on the table. Perhaps there is a hybrid approach that combine benefits of both approaches... reduce interworker and eliminate ACKer bolt. *Qs about [~kabhwan]'s approach:* # If I understand correctly, in single worker mode there will be no difference. And in multiworker mode, when the entire tuple tree falls within the same worker, as well as in single worker mode, the perf will be same ? # What is the method to compute "fully processed" after tuple tree is split into per worker fractions ? Sounds like ACKers will msg each other about completion of their partial tree ... and this will boil up to the root ACKER. *Additional Thoughts:* There appears to be two properties (which I think are true) that remain unexploited in the two approaches and a potential hybrid approach: - Within the worker process, the msging subsystem guarantees *delivery* and there can be no msg loss. - Message loss happens only when a worker goes down (or gets disconnected from the rest of the topo) was (Author: roshan_naik): *Summary of the above two approaches:* +First approach+ tries to eliminate ACKer bolt by employing the existing spouts and bolts. It doesnt try to mitigate interworker ACK msgs. Also ACK msgs can get stuck behind regular msgs under BP. +Second approach+ is centered around mitigating interworker ACK msging. Retains need to tweak Acker count separately to handle more/lesser spout and bolts... most likely a fixed number of them in each worker. Both eliminate need for fields grouping of ACKs. Both need more thought on timeout resets. *Some data points:* * *Multiworker Speed:* In Storm 2, two components on +different workers+ can sustain a max throughput of ~*4 mill/sec* over the interworker msg-ing (w/o ACKing). * *ACKer bolt* speed (Single worker mode):( ** An ACKer bolt can sustain a max topo throughput of only ~*1.7 mill/sec* in *single worker* mode... when handling an ultra bare minimal topo of 1 spout and 1 bolt. ** ACKer slows down to *950k/sec* mill/sec with adding one more bolt in the mix (i.e. ConstSpoutIdBoltNullBoltTopo) * *ACKing + Multi worker:* For 2 workers (1 spout & 1 bolt) ACK mode throughput is at ~*900k/s.* Ideally, ACKing path within single worker should be faster than interworker msging, given the absence of de/serialization and network stack. Clearly both the ACKer bolt itself and the interworker msging are big factors... with the ACKer bolt being the more significant one. Mitigating only interworker msging, will likely leave a lot of potential performance on the table. Perhaps there is a hybrid approach that combine benefits of both approaches... reduce interworker and eliminate ACKer bolt. *Qs about [~kabhwan]'s approach:* # If I understand correctly, in single worker mode there will be no difference. And in multiworker mode, when the entire tuple tree falls within the same worker, as well as in single worker mode, the perf will be same ? # What is the method to compute "fully processed" after tuple tree is split into per worker fractions ? Sounds like ACKers will msg each other about completion of their partial tree ... and this will boil up to the root ACKER. *Additional Thoughts:* There appears to be two properties (which I think are true) that remain unexploited in the two approaches and a potential hybrid approach: - Within the worker process, the msging subsystem guarantees *delivery* and there can be no msg loss. - Message loss happens only when a worker goes down (or gets disconnected from the rest of the topo) > Acker redesign > -------------- > > Key: STORM-3314 > URL: https://issues.apache.org/jira/browse/STORM-3314 > Project: Apache Storm > Issue Type: Improvement > Components: storm-client > Reporter: Roshan Naik > Priority: Major > > *Context:* The ACKing mechanism has come focus as one of the next major > bottlenecks to address. The strategy to timeout and replay tuples has issues > discussed in STORM-2359 > *Basic idea:* Every bolt will send an ACK msg to its upstream spout/bolt once > the tuples it emitted have been *fully processed* by downstream bolts. > *Determining "fully processed”* : For every incoming (parent) tuple, a bolt > can emit 0 or more “child” tuples. the Parent tuple is considered fully > processed once a bolt receives ACKs for all the child emits (if any). This > basic idea cascades all the way back up to the spout that emitted the root of > the tuple tree. > This means that, when a bolt is finished with all the child emits and it > calls ack() no ACK message will be generated (unless there were 0 child > emits). The ack() marks the completion of all child emits for a parent tuple. > The bolt will emit an ACK to its upstream component once all the ACKs from > downstream components have been received. > *Operational changes:* The existing spouts and bolts don’t need any change. > The bolt executor will need to process incoming acks from downstream bolts > and send an ACK to its upstream component as needed. In the case of 0 child > emits, ack() itself could immediately send the ACK to the upstream component. > Field grouping is not applied to ACK messages. > Total ACK messages: The spout output collector will no longer send an > ACK-init message to the ACKer bolt. Other than this, the total number of > emitted ACK messages does not change. Instead of the ACKs going to an ACKer > bolt, they get spread out among the existing bolts. It appears that this mode > may reduce some of the inter-worker traffic of ACK messages. > *Memory use:* If we use the existing XOR logic from ACKer bolt, we need about > 20 bytes per outstanding tuple-tree at each bolt. Assuming an average of say > 50k outstanding tuples at each level, we have 50k*20bytes = 1MB per bolt > instance. There may be room to do something better than XOR, since we only > need to track one level of outstanding emits at each bolt. > *Replay:* [needs more thinking] One option is to send REPLAY or TIMEOUT msgs > upstream. Policy of when to emit them needs more thought. Good to avoid > Timeouts/replays of inflight tuples under backpressure since this will lead > to "event tsunami" at the worst possible time. Ideally, if possible, replay > should be avoided unless tuples have been dropped. Would be nice to avoid > sending TIMEOUT_RESET msgs upstream when under backpressure ... since they > are likely to face backpressure as well. > On receiving an ACKs or REPLAYs from downstream components, a bolt needs to > clears the corresponding 20 bytes tracking info. > > *Concerns:* ACK tuples traversing upstream means it takes longer to get back > to Spout. > > > *Related Note:* +Why ACKer is slow ?:+ > Although lightweigh internally, the ACKer bolt has a huge impact on > throughput. The latency hit does not appear to be as significant. > I have some thoughts around why ACKer slows down throughput. > Consider the foll simple topo: > {{Spout ==> Bolt1 ==> Bolt2}} > If we add an ACKer to the above topo, the Acker bolt receives 3x more > incoming messages than the Bolt1 & Bolt2. Thats instantly a 3x hit on its > throughput on ACKer. Additionally each spout and bolt now emits 2 msgs > instead of 1 (if acker were absent). This slows down the spouts and bolts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)