[ https://issues.apache.org/jira/browse/STORM-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14626476#comment-14626476 ]
ASF GitHub Bot commented on STORM-67: ------------------------------------- Github user bourneagain commented on the pull request: https://github.com/apache/storm/pull/593#issuecomment-121281726 Thanks @HeartSaVioR .We can have this merged to master whenever we feel appropriate. > Provide API for spouts to know how many pending messages there are > ------------------------------------------------------------------ > > Key: STORM-67 > URL: https://issues.apache.org/jira/browse/STORM-67 > Project: Apache Storm > Issue Type: New Feature > Reporter: James Xu > Assignee: Shyam Rajendran > Labels: newbie > > https://github.com/nathanmarz/storm/issues/343 > This would be useful in case you want to take special action in the spout > like drop messages > ----------------- > Discmt: Hi, I'd like to try and take a crack at this if it's still relevant. > I'm not exactly sure what it's asking for though. It seems to me an > implementation for knowing how many pending messages there are for a spout > depends on where the spout is getting it's information from, which makes me > sure I'm missing something. > ----------------- > revans2: The spout code in backtype/storm/daemon/executor.clj is already > keeping track of the pending tuples if acking is enabled. If acking is > disabled nothing is pending. > defmethod mk-threads :spout [executor-data task-datas] > defines pending as a RotatingMap which maps all of the storm internal tuple > ids to the message id objects passed in by the spout when it first emitted > the tuple. The hardest part should be getting pending to a place where the > ISpoutOutputCollector implementation or where ever the API is, can get access > to it. > ----------------- > ptgoetz: @Discmt Yes, this is still relevant and would be nice to have. > The Storm framework asks spouts for tuples by calling the nextTuple() method > and keeps track of the tuple tree internally. The underlying data source does > not come into play. > As implied by @revans2, one approach would be to add a method to > ISpoutOutputCollector such as getPendingCount() that would allow spout > implementations to query for the pending count (probably returning -1 if > acking is disabled). The tricky part will likely be bridging the gap between > executor.clj and the ISpoutOutputCollector implementation(s). I haven't dug > very deeply into the code, so off-hand I don't know how hard that would be. A > quick search of the code for TOPOLOGY_MAX_PENDING should point you to some of > the touch points. > Also keep in mind the dual meaning of TOPOLOGY_MAX_PENDING. In a standard > storm topology it represents the maximum number of outstanding tuples. In a > trident topology it represents the maximum number of outstanding batches. > ----------------- > Discmt: Hey guys. I've been taking time to look into it, and I feel like I > might have an understanding of what exactly it is I need to do. If what > @revans2 said is true, and all pending messages are kept within that > RotatingMap then this should be somewhat straightforward. I am trying to > compile my own storm.jar file right now but I haven't figured how. I tried > using build_release.sh in the bin file, but I had no luck. I also tried using > lein jar > ----------------- > xumingming: try the following: > lein sub install > lein install > after these commands are executed, there should be a jar file named > storm-xxx.jar in $STORM_HOME/target/. > ----------------- > Discmt: @xumingming . Thanks for the advice. I found that I had Leiningen 1, > but the minimum for is Leiningen 2. > ----------------- > xumingming: yeah, storm requires lein 2 to build: > https://github.com/nathanmarz/storm/blob/master/project.clj#L14 > ----------------- > Discmt: Hi guys. I got my development environment squared away and I can > properly build releases now. I use the build_release.sh script. I tried > making a change the way @ptgoetz and @revans2 had suggested by adding a > method to the output collector to return the pending count. I have some > questions about it. > I noticed most of the collector implementations rely on a delegate, or > mediator, which I'm assuming is defined here: > https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L504-515. > So if I make a add a method to get the size of pending, defined here > https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L408-414, > like so: > (SpoutOutputCollector. > (reify ISpoutOutputCollector > (^int getPendingCount[this] > (.size pending) > ) > (^List emit [this ^String stream-id ^List tuple ^Object > message-id] > (send-spout-msg stream-id tuple message-id nil) > ) > (^void emitDirect [this ^int out-task-id ^String stream-id > ^List tuple ^Object message-id] > (send-spout-msg stream-id tuple message-id out-task-id) > ) > (reportError [this error] > (report-error error) > ))))) > I should be good right? Aside from the collectors of the trident spouts which > may take more research. > Just so I'm clear, messages are considered pending if they have left the > spout and are waiting to be "fully processed", as defined here: > https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing ? > My last question is: does anyone have any suggestions on what would be a good > way to test this? i.e. "What kind of topology/scenario should I run". > ----------------- > revans2: The code you pasted above is creating a new SoputOputCollector > instance wrapping an ISpoutOutputCollector instance. In order for any java > spout to actually get access to the getPendingCount method, you will need to > modify ISpoutOutputCollector to have that method defined in it, and also > update SpoutOutputCollector, etc to also implement that method delegating to > the ISpoutOutputCollector instance. > You are correct about the definition of pending. > Testing is a bit more difficult. You could have a spout that sends out > messages to a bolt that does not process them. You could then verify that > each time you send out a message the pending count goes up by 1. But to fully > test it would would need to have some coordination between the spout and the > bolt. This is not impossible but you may need to use global values to do so. > ----------------- > Discmt: @revans2 I got the interface implementation covered. Thank you for > the testing suggestion, and hints on how to perform it. It's incredibly > helpful. I think that's a good idea, and I'm going to try and do that to test > it out. > ----------------- > nathanmarz: There's testing infrastructure already built that can do this > kind of tracking. The name in the testing code is "tracked topologies". I'm > traveling right now so can't give a link but you should be able to find it. > It's used quite a bit in the tests of the acking system. > ----------------- > Discmt: @nathanmarz This is also good news. I'll take a look and use what I > can there to speed up the process for me. > ----------------- > Discmt: Hi, I am asking if someone can explain to me how these tracked > topologies work. > Particularly I'm confused about these lines here: > https://github.com/nathanmarz/storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L224-245 > Looking in integration_test.clj under the function test-acking on line 219 > one of the spouts is told to feed a tuple(.feed feeder1 [1]). Then a > tracked-wait is called on the topology being tracked to wait for one tuple to > be emitted from the spout. Afterwards the checker checks whether or not 0 > tuples have been acknowledged, and this what I expect is that one 1 tuple > would have been acked because it was emitted from the spout. However, this is > not the case. Then later on there checker1 is called again expecting there to > be one tuple, but another one was not fed. Furthermore, feeder2, a second > spout, was told to feed a tuple, and as expected one was already there when > checker2 checked for one ack, but this behavior is not the same as the first > checker. > ----------------- > nathanmarz: tracked-wait waits until the entire tree of processing of the > tuple has finished. The "checker1" function checks how many tuples on the > spout have been acked since last time it was called. The topology is set up > so that acknowledgement of tuple trees is delayed until bolts have received > multiple tuples. That's why nothing is acked on the spout after the first > tuple is emitted, but it is when the second tuple is emitted. If you look at > the logic of branching-bolt and agg-bolt you'll see why this is the case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)