[ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16741281#comment-16741281 ]
Stig Rohde Døssing commented on STORM-2359: ------------------------------------------- {quote} if a msg is in the inputQ or pendingEmitsQ, then it is in progress and it cannot also be ACKed {quote} Yes. However since the JCTools queues don't support peeking at other elements than the head of the queue, we need to track in progress messages separately somehow. {quote} Overall it sounds like you want to track inflight tuples at each executor... which if true... is a big memory overhead {quote} Not exactly. I want to track the anchors (i.e. some longs) of the tuples that are in flight in the worker. We don't need to keep track of the tuple itself. I basically just want to keep a count for each anchor. When a tuple becomes pending, we increment the count for each of its anchors. When the tuple is acked/failed, or removed from pendingEmits (depending on whether the tuple is inbound or outbound), we decrement the count for each anchor. The reset timeout thread can then just send resets for all anchors with a non-0 count periodically. Example: The worker receives tuple t1 with anchors [1, 3, 6] from Netty. It increments the counts for key 1, 3 and 6 in the ConcurrentHashMap, and adds the tuple to the inbound queue for the receiving executor. When the receiving task processes the tuple, it acks or fails it. During ack/fail, it decrements the counts for the anchors, 1, 3 and 6 (the map implementation removes keys that reach 0). The reset thread periodically sends reset messages for each anchor in the ConcurrentHashMap. Regarding the implementation, I agree. The performance with ConcurrentHashMap doesn't look too awful to me, but maybe we can do better. Since we're only trying to keep a running count for the anchors, maybe we can push the Map code into another thread, and have the critical path just insert increment/decrement commands into a queue. I'll take a look at whether this is doable and how it performs. Regarding an alternative acking design, I'd be happy if we could avoid this issue entirely with a new design. Please do describe your thoughts. > Revising Message Timeouts > ------------------------- > > Key: STORM-2359 > URL: https://issues.apache.org/jira/browse/STORM-2359 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core > Affects Versions: 2.0.0 > Reporter: Roshan Naik > Assignee: Stig Rohde Døssing > Priority: Major > Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods > > > A revised strategy for message timeouts is proposed here. > Design Doc: > > https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)