[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16741281#comment-16741281
 ] 

Stig Rohde Døssing commented on STORM-2359:
-------------------------------------------

{quote}
if a msg is in the inputQ or pendingEmitsQ, then it is in progress and it 
cannot also be ACKed
{quote}
Yes. However since the JCTools queues don't support peeking at other elements 
than the head of the queue, we need to track in progress messages separately 
somehow.

{quote}
Overall it sounds like you want to track inflight tuples at each executor... 
which if true... is a big memory overhead
{quote}
Not exactly. I want to track the anchors (i.e. some longs) of the tuples that 
are in flight in the worker. We don't need to keep track of the tuple itself. I 
basically just want to keep a count for each anchor. When a tuple becomes 
pending, we increment the count for each of its anchors. When the tuple is 
acked/failed, or removed from pendingEmits (depending on whether the tuple is 
inbound or outbound), we decrement the count for each anchor. The reset timeout 
thread can then just send resets for all anchors with a non-0 count 
periodically.

Example: 
The worker receives tuple t1 with anchors [1, 3, 6] from Netty. It increments 
the counts for key 1, 3 and 6 in the ConcurrentHashMap, and adds the tuple to 
the inbound queue for the receiving executor. When the receiving task processes 
the tuple, it acks or fails it. During ack/fail, it decrements the counts for 
the anchors, 1, 3 and 6 (the map implementation removes keys that reach 0). The 
reset thread periodically sends reset messages for each anchor in the 
ConcurrentHashMap. 

Regarding the implementation, I agree. The performance with ConcurrentHashMap 
doesn't look too awful to me, but maybe we can do better. Since we're only 
trying to keep a running count for the anchors, maybe we can push the Map code 
into another thread, and have the critical path just insert increment/decrement 
commands into a queue. I'll take a look at whether this is doable and how it 
performs.

Regarding an alternative acking design, I'd be happy if we could avoid this 
issue entirely with a new design. Please do describe your thoughts.

> Revising Message Timeouts
> -------------------------
>
>                 Key: STORM-2359
>                 URL: https://issues.apache.org/jira/browse/STORM-2359
>             Project: Apache Storm
>          Issue Type: Sub-task
>          Components: storm-core
>    Affects Versions: 2.0.0
>            Reporter: Roshan Naik
>            Assignee: Stig Rohde Døssing
>            Priority: Major
>         Attachments: STORM-2359-with-auto-reset.ods, STORM-2359.ods
>
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to