[ 
https://issues.apache.org/jira/browse/STORM-3314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16745145#comment-16745145
 ] 

Stig Rohde Døssing commented on STORM-3314:
-------------------------------------------

The core idea of avoiding crossing the worker boundary as much as possible 
sounds really good.

I think I like Jungtaek's iteration a little better.

I see the following benefits to the second idea over the first:
* Using a separate acker task in each worker means we don't have to worry about 
acks getting queued behind user tuples
* The same worry would apply to timeout resets, and would almost certainly mean 
that we can't do resetting at all reliably.
* As Jungtaek mentioned, backpressure would be a concern for acks when they're 
going through the same queues as user tuples
* We keep the ability to add more ackers in each worker if we want. 

My initial suggestion for how timeouts would work with this setup would be that 
we move timeout logic entirely to the acker. Since the spout and initial acker 
are now guaranteed to run in the same worker, we don't need the spout to be 
timing out tuples by itself, and we don't need to worry about the first 
ACK_INIT getting lost. 

When doing a timeout reset, if we have every acker keep its own timeout, we 
will need to walk up the tree from the worker that resets the timeout to the 
root acker. I think we might be able to reduce the number of reset messages 
here by having only the root acker time out tuples. We can use the sync tuple 
mechanism I suggested in STORM-2359 to make sure downstream ackers don't keep 
outdated anchors around. This way, we can jump straight to the root, instead of 
having to walk the tree.

I'll have to think a bit more about automatic timeout resetting. Currently it 
can be made to work nicely because every ack resets the timeout, which makes it 
easy to reason about how long we can safely delay resetting tuples. This lets 
us decide not to reset tuples that have only been present in a worker for a 
short time, because we know that the tree was likely acked recently, right 
before it was sent to the worker. If acks don't end up hitting the root acker, 
we might need to send some extra tuples to ensure timeouts are reset. Maybe we 
can get away with buffering up acked anchors for a percentage of the message 
timeout in the acker, and send them as a batch message to the root acker. I'm a 
little worried about flooding the root acker with reset messages.

Regarding FAIL tuples, if we decide to use sync tuples to keep the root acker 
and other ackers in sync, we might be able to just send FAILs up the tree to 
the root acker, and let the sync tuple be responsible for removing it from any 
branches of the tuple path that didn't get the FAIL. I think trying to ensure 
all ackers are notified about the FAIL immediately would be hard.

> Acker redesign
> --------------
>
>                 Key: STORM-3314
>                 URL: https://issues.apache.org/jira/browse/STORM-3314
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-client
>            Reporter: Roshan Naik
>            Priority: Major
>
> *Context:* The ACKing mechanism has come focus as one of the next major 
> bottlenecks to address. The strategy to timeout and replay tuples has issues 
> discussed in STORM-2359
> *Basic idea:* Every bolt will send an ACK msg to its upstream spout/bolt once 
> the tuples it emitted have been *fully processed* by downstream bolts.
> *Determining "fully processed”* : For every incoming (parent) tuple, a bolt 
> can emit 0 or more “child” tuples. the Parent tuple is considered fully 
> processed once a bolt receives ACKs for all the child emits (if any). This 
> basic idea cascades all the way back up to the spout that emitted the root of 
> the tuple tree.
> This means that, when a bolt is finished with all the child emits and it 
> calls ack() no ACK message will be generated (unless there were 0 child 
> emits). The ack() marks the completion of all child emits for a parent tuple. 
> The bolt will emit an ACK to its upstream component once all the ACKs from 
> downstream components have been received.
> *Operational changes:* The existing spouts and bolts don’t need any change. 
> The bolt executor will need to process incoming acks from downstream bolts 
> and send an ACK to its upstream component as needed. In the case of 0 child 
> emits, ack() itself could immediately send the ACK to the upstream component. 
> Field grouping is not applied to ACK messages.
> Total ACK messages: The spout output collector will no longer send an 
> ACK-init message to the ACKer bolt. Other than this, the total number of 
> emitted ACK messages does not change. Instead of the ACKs going to an ACKer 
> bolt, they get spread out among the existing bolts. It appears that this mode 
> may reduce some of the inter-worker traffic of ACK messages.
> *Memory use:* If we use the existing XOR logic from ACKer bolt, we need about 
> 20 bytes per outstanding tuple-tree at each bolt. Assuming an average of say 
> 50k outstanding tuples at each level, we have 50k*20bytes = 1MB per bolt 
> instance. There may be room to do something better than XOR, since we only 
> need to track one level of outstanding emits at each bolt.
> *Replay:* [needs more thinking] One option is to send REPLAY or TIMEOUT msgs 
> upstream. Policy of when to emit them needs more thought. Good to avoid 
> Timeouts/replays of inflight tuples under backpressure since this will lead 
> to "event tsunami" at the worst possible time. Ideally, if possible, replay 
> should be avoided unless tuples have been dropped. Would be nice to avoid 
> sending TIMEOUT_RESET msgs upstream when under backpressure ... since they 
> are likely to face backpressure as well.
> On receiving an ACKs or REPLAYs from downstream components, a bolt needs to 
> clears the corresponding 20 bytes tracking info.
>  
> *Concerns:* ACK tuples traversing upstream means it takes longer to get back 
> to Spout.
>  
>  
> *Related Note:* +Why ACKer is slow ?:+
> Although lightweigh internally, the ACKer bolt has a huge impact on 
> throughput. The latency hit does not appear to be as significant.
> I have some thoughts around why ACKer slows down throughput. 
> Consider the foll simple topo: 
> {{Spout ==> Bolt1 ==> Bolt2}}
> If we add an ACKer to the above topo, the Acker bolt receives 3x more 
> incoming messages than the Bolt1 & Bolt2. Thats instantly a 3x hit on its 
> throughput on ACKer. Additionally each spout and bolt now emits 2 msgs 
> instead of 1 (if acker were absent). This slows down the spouts and bolts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to