[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16745845#comment-16745845 ] Roshan Naik commented on STORM-2359: Yes indeed. It will be quite nice if JC Tools queues could support peek()ing and still be lock free. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16745154#comment-16745154 ] Stig Rohde Døssing commented on STORM-2359: --- Thanks, responded there. Just to give an update on where I am with this, I took another look at whether we can avoid all the extra tracking of anchors in the critical path. It turns out we can, if the JCTools queues are updated to allow another thread to peek at the queue contents. I have a local branch of JCTools that seems like it can do this, so I'll try suggesting adding this method to the JCTools guys. If we can peek at the queue contents, we can do resets for all queued tuples without adding any extra logic to the critical path. The timeout resetter thread can just look at the queue contents, without having to involve the critical path code in maintaining a ConcurrentHashMap of anchors. Resetting timeouts for queued tuples will let users reset timeouts that have been delivered to a bolt's exeute() without risking queued tuples timing out (the method is effectively useless right now). As a convenience, we can add a component-level configuration that will do automatic timeout reset for tuples that have been delivered to execute() but not acked or failed, using the same technique (a ConcurrentHashMap) as I proposed above for the worker. This solution would be much better than what I suggested before, because it means users can choose to enable the expensive tracking only for specific problem bolts, and only if they don't want to/can't manually call collector.resetTimeout. Resetting for tuples that are queued will be pretty cheap, because it doesn't add any slowdown to the critical path, and only produces reset tuples when processing is actually slow. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744817#comment-16744817 ] Roshan Naik commented on STORM-2359: [~Srdo] Fyi: Just posted the initial thoughts around Acker redesign in STORM-3314. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16741281#comment-16741281 ] Stig Rohde Døssing commented on STORM-2359: --- {quote} if a msg is in the inputQ or pendingEmitsQ, then it is in progress and it cannot also be ACKed {quote} Yes. However since the JCTools queues don't support peeking at other elements than the head of the queue, we need to track in progress messages separately somehow. {quote} Overall it sounds like you want to track inflight tuples at each executor... which if true... is a big memory overhead {quote} Not exactly. I want to track the anchors (i.e. some longs) of the tuples that are in flight in the worker. We don't need to keep track of the tuple itself. I basically just want to keep a count for each anchor. When a tuple becomes pending, we increment the count for each of its anchors. When the tuple is acked/failed, or removed from pendingEmits (depending on whether the tuple is inbound or outbound), we decrement the count for each anchor. The reset timeout thread can then just send resets for all anchors with a non-0 count periodically. Example: The worker receives tuple t1 with anchors [1, 3, 6] from Netty. It increments the counts for key 1, 3 and 6 in the ConcurrentHashMap, and adds the tuple to the inbound queue for the receiving executor. When the receiving task processes the tuple, it acks or fails it. During ack/fail, it decrements the counts for the anchors, 1, 3 and 6 (the map implementation removes keys that reach 0). The reset thread periodically sends reset messages for each anchor in the ConcurrentHashMap. Regarding the implementation, I agree. The performance with ConcurrentHashMap doesn't look too awful to me, but maybe we can do better. Since we're only trying to keep a running count for the anchors, maybe we can push the Map code into another thread, and have the critical path just insert increment/decrement commands into a queue. I'll take a look at whether this is doable and how it performs. Regarding an alternative acking design, I'd be happy if we could avoid this issue entirely with a new design. Please do describe your thoughts. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16740143#comment-16740143 ] Roshan Naik commented on STORM-2359: Got a chance to read through your posts more carefully again. Your scheme to keep timeout computation in ACKer by having acker send back (A-B) to the spout seems sound. I am unable to grasp the timeout reset strategy though. These 2 stmts in particular. "For the inbound queue, the anchor is no longer in progress when the associated tuple is acked or failed. For the outbound queue (pendingEmits in the Executor), the anchor is no longer in progress when the associated tuple gets flushed from pendingEmits." I am thinking ... if a msg is in the inputQ or pendingEmitsQ, then it is *in progress* and it cannot also be ACKed. Overall it sounds like you want to track inflight tuples at each executor... which if true... is a big memory overhead. On the implementation side one key concern is that in the 2.0 perf re-architecture (STORM-2306) a good amount of effort was spent into tuning the critical path by cutting back on hashmaps and eliminating locks both of which are perf killers. On a very quick scan of the code, I noticed timeout strategy is introducing ConcurrentHashMap into the critical path... which introduces both locks and hashmaps in one go. Btw... Since you have reactivated this topic on ACKing... its probably worth bringing up [~kabhwan] and myself were rethinking the ACKing design sometime back. Since the ACKer bolt has a severe perf impact in Storm core... we were looking at a way to do acking without the ACKer bolt. As you can imagine, it would also impact the timeouts as well. I can post the general idea in another Jira and see what you think ... either continue down this path for now ... and later replace with a newer model ...or investigate a newer model directly. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739641#comment-16739641 ] Stig Rohde Døssing commented on STORM-2359: --- I think it makes sense to open a WIP PR for further discussion? > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739639#comment-16739639 ] Stig Rohde Døssing commented on STORM-2359: --- I've been taking a look at the feasibility of automatically resetting timeouts for tuples that are still being processed, and I think we can do it without much overhead. The idea is to track the anchor ids of each non-system message that enters an executor in/out queue. For the inbound queue, the anchor is no longer in progress when the associated tuple is acked or failed. For the outbound queue (pendingEmits in the Executor), the anchor is no longer in progress when the associated tuple gets flushed from pendingEmits. Occasionally a thread will check the set of in-progress anchors for the worker and send reset messages for all of them to the relevant ackers. In order to avoid sending too many messages, this thread snapshots the anchor set when it runs, and only sends reset messages for anchors that have been in progress sufficiently long in that worker. Since there may be more than one tuple per anchor, anchors are tracked as a count in a multiset, rather than just presence in a set. I've updated the spreadsheet with benchmark numbers for TVL with this functionality enabled. For the 90k example I also did a run where the grace period is disabled, to show the penalty for sending resets in the worst case, i.e. all in progress tuples have their timeouts reset every time the resetter thread runs. The code is available at https://github.com/srdo/storm/tree/auto-reset-timeout. Only the latest commit is new. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730920#comment-16730920 ] Stig Rohde Døssing commented on STORM-2359: --- The contents of the message is a list of longs (anchor ids). The spout sends the list of anchors it thinks are pending to the acker. The acker removes any anchors from the list that are actually pending, and returns the list of anchors to fail. Say the spout has emitted messages with anchor ids 1, 3, 4 to acker 1 and message 2 to acker 2. Say acker 1 has timed out message 1, and acker 2 had crashed and restarted since receiving message 2. The spout will send [1,3,4] to acker 1, and [2] to acker 2. The ackers remove any anchor id from the received lists they recognize and return the modified list to the spout. So acker 1 returns [1] and acker 2 returns [2]. When the spout receives the response tuple, it will fail the tuples with the received anchor ids, so 1 and 2 will fail, while 3 and 4 remain pending. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730881#comment-16730881 ] Roshan Naik commented on STORM-2359: I see your point about having the timeout logic in acker. It’s a good one!. Sorry for delay in my response. I am traveling so unable to look into code Could u elaborate on the contents of the message being exhanged for Comoutung A-B? > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730657#comment-16730657 ] Stig Rohde Døssing commented on STORM-2359: --- I made a few more changes, mainly to remove the pending map from the spout. It looks like there is no real difference in performance between timeouts in the acker and timeouts in the spout now. I've updated the document with benchmarks, please take a look. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726614#comment-16726614 ] Stig Rohde Døssing commented on STORM-2359: --- {quote}Acker will be totally clueless if all acks from downstream bolts are also lost for the same tuple tree.{quote} Yes. This is why the sync tuple is needed. If the spout executor doesn't time out the tree on its own, then we need a mechanism to deal with trees where the acker isn't aware of the tree due to message loss of the init and ack/fail messages. If we don't do a sync, the spout will never know that the acker isn't aware of the tree, and the tree will never fail. {quote} 1) Handle timeouts in SpoutExecutor. It sends a timeout msg to ACKer. *+Benefit+*: May not be any better than doing in ACKer. Do you have any thoughts about this ?{quote} I don't think there's any benefit to doing this over what we're doing now. Currently the spout executor times out the tuple, and the acker doesn't try to fail tuples on timeout. Instead it just quietly discards whatever information it has about a tree once the pending map rotates a few times. I'm not sure what we'd gain from the acker not rotating pending and relying on a timeout tuple instead. The benefit as I see it of moving the timeouts to the acker would be the ability to reset timeouts more frequently (e.g. on every ack) without increasing load on the spout executor, which we can't do if the spout is still handling the timeout. {quote} 2) Eliminate timeout communication between Spout & ACK. Let each does its own timeout.{quote} This is how it works now. As you note, there are some benefits to doing it this way. An additional benefit is that we can easily reason about the max time to fail a tuple on timeout, since the tuple will fail as soon as the spout rotates it out of pending. The drawbacks are: * Any time the timeout needs to be reset, a message needs to go to both the acker and the spout (current path is bolt -> acker -> spout) * Since resetting timeouts is reasonably expensive, we don't do it as part of regular acks, the user has to manually call collector.resetTimeout(). The benefit of moving timeouts entirely to the acker is that we can reset timeouts automatically on ack. This means that the tuple timeout becomes somewhat easier to work with, since tuples won't time out as long as an edge in the tree is getting acked occasionally. This is currently more of a loose idea, but in the long run I'd like to try adding a feature toggle for more aggressive automatic timeout resets. I'm not sure what the performance penalty would be, but it would be nice if the bolts could periodically reset the timeout for all queued/in progress tuples (tuples received by the worker, but not yet acked/failed by a collector). Doing this would eliminate the degenerate case you mention in the design doc, where some bolt is taking slightly too long to process a tuple, and the queued tuples time out, which causes the spout to reemit, which puts the reemits in queue behind the already timed out tuples. In some cases this can cause the topology to spend all its time processing timed out tuples, preventing it from making progress, even though each individual tuple could have been processed within the timeout. If this idea turns out to be possible to implement without adding a large overhead, it would add an extra drawback to timeouts in the spout: * We can't do automatic timeout resetting, since flooding the spout with reset messages is a bad idea. In particular we can't reset timeouts for messages that are sitting in bolt queues. The user can't reset these timeouts manually either. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726414#comment-16726414 ] Roshan Naik commented on STORM-2359: {quote}If we were to move timeouts entirely to the acker, there would be a potential for "lost" tuples (ones that end up not being reemitted) if messages between the acker and spout get lost. * The spout may emit a new tuple, and try to notify the acker about it. If this message is lost, the acker doesn't know about the tuple, and thus can't time it out. {quote} Acker will be totally clueless if all acks from downstream bolts are also lost for the same tuple tree. {quote}We can move the timeout logic to the acker, {quote} Alternatives : 1) Handle timeouts in SpoutExecutor. It sends a timeout msg to ACKer. *+Benefit+*: May not be any better than doing in ACKer. Do you have any thoughts about this ? 2) Eliminate timeout communication between Spout & ACK. Let each does its own timeout. *+Benefit+*: Eliminates need for: * Periodic sync up msgs (also avoid possibility of latency spikes in multi worker mode when these msgs get large). * A - B calculation, as well as * timeout msg exchanges. TimeoutReset msgs can still be supported. -roshan > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723067#comment-16723067 ] Stig Rohde Døssing commented on STORM-2359: --- There's a branch implementing this at [https://github.com/srdo/storm/tree/STORM-2359-experimentation.] I did a couple of runs of the ConstSpoutNullBoltTopology from storm-perf, as well as the ThroughputVsLatency topology. I'm seeing an overhead of about 10% for the ConstSpoutNullBoltTopology, since the spout and ackers are the only components involved. For the ThroughputVsLatency topology, the overhead looks to be negligible. I've attached the raw numbers and some charts. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723070#comment-16723070 ] Stig Rohde Døssing commented on STORM-2359: --- I will check what impact it has to reinsert tuples in the acker's pending when acks are received. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Assignee: Stig Rohde Døssing >Priority: Major > Attachments: STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723055#comment-16723055 ] Stig Rohde Døssing commented on STORM-2359: --- I've been taking a look at this, and have a proposal for how we could move the timeout entirely to the acker. I'm going to assume in the following that tuples sent from one task to another are received in the order they were sent, ignoring message loss. I think this is the case, but please correct me if it isn't. h6. The problem Both the acker and spout executor currently have rotating pending maps, which time out tuples based on received ticks. The acker pending map just discards the tuple, while the spout pending map fails them if they get rotated out. The reason this currently happens in both acker and spout is that we need to ensure that the spout fails tuples if they time out, even in the presence of message loss. If we were to move timeouts entirely to the acker, there would be a potential for "lost" tuples (ones that end up not being reemitted) if messages between the acker and spout get lost. * The spout may emit a new tuple, and try to notify the acker about it. If this message is lost, the acker doesn't know about the tuple, and thus can't time it out. * The acker may send an ack or fail message back to the spout, which may get lost. Since the acker currently deletes acked/failed messages as soon as they are acked/failed, this would prevent the message from being replayed. h6. Suggested solution We can move the timeout logic to the acker, and it will work out of the box as long as there are no lost messages. I think we can account for lost messages by having the spout executor periodically update its view of pending tuples based on the state in the acker. Say the spout pending root ids are A, and the acker pending root ids are B. The spout can periodically (e.g. once per second) send to each acker the root ids in A. The acker should respond to this tuple by sending back A - B (it can optionally also delete anything in B that is not in A). The spout can safely fail any tuple in A - B which is also in the spout pending when the sync tuple response is received. * If a tuple is in A and B, it is still pending, and shouldn't be removed. Returning only A - B ensures pending tuples remain. * If a tuple is in A but not B, the ack_init message was lost, the acker may have crashed and restarted or the tuple has simply been acked or failed. ** If the ack_init was lost, or the acker crashed, then the spout should replay the message. Since A - B contains the tuple, the spout will fail it when it receives the response. ** If the tuple was acked, then the acker is guaranteed to have sent the ack message on to the spout before handling the sync tuple. Due to message ordering, the ack will be processed before the sync tuple response, making the presence of the tuple in A - B irrelevant. * If a tuple is not in A but in B, then the spout may have crashed. The acker can optionally just discard the pending tuple without notifying the spout, since notifying the spout about the state of a tuple emitted by a different instance is a no op. h6. Benefit Moving the timeout logic to the acker makes it much cheaper to reset tuple timeouts, since the spout no longer needs to be notified directly. We could likely make the acker reset the timeout automatically any time it receives an ack. Depending on overhead, we might be able to add an option to let Storm reset timeouts for tuples that are still being actively processed (i.e. received by a bolt but not yet acked/failed). This would be beneficial to avoid the event tsunami problem described in the linked design doc. It could help prevent the type of degenerate case described at https://issues.apache.org/jira/browse/STORM-2359?focusedCommentId=16043409&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16043409. h6. Cost There is a bit of extra overhead to doing this. The spout needs to keep track of which acker tasks are responsible for which root ids. There is also the (fairly minor) overhead of sending the sync tuples back and forth. In terms of processing reliability, a tuple the acker considers failed/timed out will be failed in the spout once one of the sync tuples make it through. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik >Priority: Major > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This me
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045442#comment-16045442 ] Stig Rohde Døssing commented on STORM-2359: --- I think the reason it would be tough to move timeouts entirely to the ackers is that we'd need to figure out how to deal with message loss between the spout and acker when the acker sends a timeout message to the spout. The current implementation errs on the side of caution by always reemitting if it can't positively say that a tuple has been acked. I'm not sure how we could do the same when the acker has to notify the spout to reemit after the timeout, because that message could be lost. It might be a good idea as you mention to instead have two timeouts, a short one for the acker and a much longer one for the spout. It would probably mean that messages where the acker init message is lost will take much longer to retry than messages that are lost elsewhere, but it might allow us to keep timeout resets out of the spout. Tuples can be lost if a worker died, but what if there's a network issue? Can't messages also be lost then? > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045053#comment-16045053 ] Roshan Naik commented on STORM-2359: {quote} The current implementation has a pending map in both the acker and the spout, which rotate every topology.message.timeout.secs.{quote} Need to see if we can eliminate the timeout logic from the spout and have it only the ACKer (i can think of some issues). If we must retain that logic in the spouts, the timeout value that it operates on (full tuple tree processing) would have to be separated from the timeout value that the ACKER uses to track progress between stages. {quote}The spout then reemitted the expired tuples, and they got into the queue behind their own expired instances. {quote} Perfect example indeed. The motivation of this jira is to try to eliminate/mitigate triggering of timeouts for queued/inflight tuples that are not lost. The only time we need timeouts/remits to be triggered is when one/more tuples in the tuple tree are truly lost. *I think* that can only happen if a worker/bolt/spout died. So the case your describing should not happen if we solve this problem correctly. IMO, the ideal solution would have the spouts remit only the specific tuples whose tuple trees had some loss due to a worker going down. I am not yet certain whether/not this initial idea described in the doc is the optimal solution. Perhaps a better way is to trigger such re-emits only if a worker/bolt/spout went down. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043409#comment-16043409 ] Stig Rohde Døssing commented on STORM-2359: --- {quote} The resets are managed internally by the ACKer bolt. The spout only gets notified if the timeout expires or if tuple-tree is fully processed. {quote} How will this work? The current implementation has a pending map in both the acker and the spout, which rotate every topology.message.timeout.secs. If the acker doesn't forward reset requests to the spout, the spout will just expire the tuple tree on its own when the message timeout has passed. {quote} That case would be more accurately classified as "progress is being made" ... but slower than expected. The case of 'progress is not being made' is when a worker that is processing part of the tuple tree dies. {quote} Yes, you are right. But it is currently possible that the topology may degrade to no progress being made even if each individual tuple could be processed under the message timeout, because tuples can expire while queued and get reemitted, where they can then be delayed by their own duplicates which are ahead in the queue. For IRichBolts, this can be mitigated by the bolt being written to accept and queue tuples internally, where the bolt can then reset their timeouts manually if necessary, but for IBasicBolt this is not possible. Just to give a concrete example, we had an IBasicBolt enrich tuples with some database data. Most tuples were processed very quickly, but a few were slow. Even the slow tuples never took longer than our message timeout individually. We then had an instance where a bunch of slow tuples happened to come in on the stream close to each other. The first few were processed before they expired, but the rest expired while queued. The spout then reemitted the expired tuples, and they got into the queue behind their own expired instances. Since the bolt won't skip expired tuples, the freshly emitted tuples also expired, which caused another reemit. This repeated until the topology was restarted so the queues could be cleared. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041851#comment-16041851 ] Roshan Naik commented on STORM-2359: {quote} In order to reduce load on the spout, we should try to "bundle up" these resets in the acker bolts before sending them to the spout {quote} The resets are managed internally by the ACKer bolt. The spout only gets notified if the timeout expires or if tuple-tree is fully processed. {quote}When tuples are stuck in queues behind other tuples{quote} That case would be more accurately classified as "progress is being made" ... but slower than expected. The case of 'progress is not being made' is when a worker that is processing part of the tuple tree dies. {quote}If a bolt is taking longer to process a tuple than expected, it can be solved in the concrete bolt implementation by using OutputCollector.resetTimeout at an appropriate interval (e.g. the tuple timeout minus a few seconds).{quote} I think you are trying to mitigate the number of resets being sent to Spout... but like i mentioned before reset are never sent to spout. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035647#comment-16035647 ] Roshan Naik commented on STORM-2359: sorry just noticed your comments will go though it and respond next week. > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027906#comment-16027906 ] Stig Rohde Døssing commented on STORM-2359: --- I thought a bit more about this. Here are the situations I could think of where tuples are currently being expired without being lost: * The tuple tree is still making progress, and tuples are getting acked, but the time to process the entire tree is larger than the tuple timeout. This is very likely to happen if there's congestion somewhere in the topology. * The tuple tree is not making progress, because a bolt is currently processing a tuple in the tree, and processing is taking longer than expected. * The tuple tree is not making progress, because the tuple(s) are stuck in queues behind other slow tuples. This is also very likely to happen if there's congestion in the topology. The situation where there's still progress being made can be solved by resetting the tuple timeout whenever an ack is received. In order to reduce load on the spout, we should try to "bundle up" these resets in the acker bolts before sending them to the spout. I think a decent way to do this bundling is to make the acker bolt keep track of which tuples they've received acks from since the last time timeouts were reset. When a configured interval expires, the bolt empties out the list of live tuples, and sends timeout resets for all of them to the spout. The interval should probably be specified as a percentage of the tuple timeout. If a bolt is taking longer to process a tuple than expected, it can be solved in the concrete bolt implementation by using OutputCollector.resetTimeout at an appropriate interval (e.g. the tuple timeout minus a few seconds). When tuples are stuck in queues behind other tuples, the topology can have a hard time recovering. This is because the expiration timer starts ticking for a tuple as soon as it's emitted, so if the bolt queues are congested, the bolts may be spending all their time processing tuples that belong to expired tuple trees. In order to solve this, we need to reset timeouts for queued tuples from time to time. It should be possible to add a thread that peeks at the available messages in the DisruptorQueue with some interval, and resets the timeout for any messages that were also queued last time the thread was run. Only sending the resets once a tuple has been queued for the entire interval should help decrease the number of unnecessary resets sent to the spout. We should be able to reuse the interval configuration also added to the acker bolt. I'd welcome any feedback on these ideas :) > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (STORM-2359) Revising Message Timeouts
[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930644#comment-15930644 ] Stig Rohde Døssing commented on STORM-2359: --- This is a great idea, we've also seen topologies exhibit bad performance when under pressure because a lot of the queued tuples are already considered timed out by the spout. It seems like part of the reason the tuple timeout is currently a set value that is not automatically reset is to avoid flooding the spout instances with ack messages. The ackers can be scaled out to handle a large number of acks, and can then notify the spout of acks only once per tuple tree. I'm assuming we want to keep the management of tuple tree ack/fail inside the spout( ? ), so have you considered a way to deduplicate or "bundle up" the ack stream in the acker before notifying the spout that the timeout should be reset? Maybe only notify once a percentage of the timeout has elapsed? The bolt heartbeating may be able to reuse some code from https://issues.apache.org/jira/browse/STORM-1549 > Revising Message Timeouts > - > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Affects Versions: 2.0.0 >Reporter: Roshan Naik > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)