[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723055#comment-16723055
 ] 

Stig Rohde Døssing commented on STORM-2359:
-------------------------------------------

I've been taking a look at this, and have a proposal for how we could move the 
timeout entirely to the acker.

I'm going to assume in the following that tuples sent from one task to another 
are received in the order they were sent, ignoring message loss. I think this 
is the case, but please correct me if it isn't.
h6. The problem

Both the acker and spout executor currently have rotating pending maps, which 
time out tuples based on received ticks. The acker pending map just discards 
the tuple, while the spout pending map fails them if they get rotated out. The 
reason this currently happens in both acker and spout is that we need to ensure 
that the spout fails tuples if they time out, even in the presence of message 
loss.

If we were to move timeouts entirely to the acker, there would be a potential 
for "lost" tuples (ones that end up not being reemitted) if messages between 
the acker and spout get lost.
 * The spout may emit a new tuple, and try to notify the acker about it. If 
this message is lost, the acker doesn't know about the tuple, and thus can't 
time it out.
 * The acker may send an ack or fail message back to the spout, which may get 
lost. Since the acker currently deletes acked/failed messages as soon as they 
are acked/failed, this would prevent the message from being replayed.

h6. Suggested solution

We can move the timeout logic to the acker, and it will work out of the box as 
long as there are no lost messages. I think we can account for lost messages by 
having the spout executor periodically update its view of pending tuples based 
on the state in the acker.

Say the spout pending root ids are A, and the acker pending root ids are B. The 
spout can periodically (e.g. once per second) send to each acker the root ids 
in A. The acker should respond to this tuple by sending back A - B (it can 
optionally also delete anything in B that is not in A). The spout can safely 
fail any tuple in A - B which is also in the spout pending when the sync tuple 
response is received.
 * If a tuple is in A and B, it is still pending, and shouldn't be removed. 
Returning only A - B ensures pending tuples remain.
 * If a tuple is in A but not B, the ack_init message was lost, the acker may 
have crashed and restarted or the tuple has simply been acked or failed.
 ** If the ack_init was lost, or the acker crashed, then the spout should 
replay the message. Since A - B contains the tuple, the spout will fail it when 
it receives the response.
 ** If the tuple was acked, then the acker is guaranteed to have sent the ack 
message on to the spout before handling the sync tuple. Due to message 
ordering, the ack will be processed before the sync tuple response, making the 
presence of the tuple in A - B irrelevant.
 * If a tuple is not in A but in B, then the spout may have crashed. The acker 
can optionally just discard the pending tuple without notifying the spout, 
since notifying the spout about the state of a tuple emitted by a different 
instance is a no op.

h6. Benefit

Moving the timeout logic to the acker makes it much cheaper to reset tuple 
timeouts, since the spout no longer needs to be notified directly. We could 
likely make the acker reset the timeout automatically any time it receives an 
ack.

 

Depending on overhead, we might be able to add an option to let Storm reset 
timeouts for tuples that are still being actively processed (i.e. received by a 
bolt but not yet acked/failed). This would be beneficial to avoid the event 
tsunami problem described in the linked design doc. It could help prevent the 
type of degenerate case described at 
https://issues.apache.org/jira/browse/STORM-2359?focusedCommentId=16043409&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16043409.
h6. Cost

There is a bit of extra overhead to doing this. The spout needs to keep track 
of which acker tasks are responsible for which root ids. There is also the 
(fairly minor) overhead of sending the sync tuples back and forth. In terms of 
processing reliability, a tuple the acker considers failed/timed out will be 
failed in the spout once one of the sync tuples make it through.

> Revising Message Timeouts
> -------------------------
>
>                 Key: STORM-2359
>                 URL: https://issues.apache.org/jira/browse/STORM-2359
>             Project: Apache Storm
>          Issue Type: Sub-task
>          Components: storm-core
>    Affects Versions: 2.0.0
>            Reporter: Roshan Naik
>            Priority: Major
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to