[ 
https://issues.apache.org/jira/browse/STORM-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14626476#comment-14626476
 ] 

ASF GitHub Bot commented on STORM-67:
-------------------------------------

Github user bourneagain commented on the pull request:

    https://github.com/apache/storm/pull/593#issuecomment-121281726
  
    Thanks @HeartSaVioR .We can have this merged to master whenever we feel 
appropriate. 


> Provide API for spouts to know how many pending messages there are
> ------------------------------------------------------------------
>
>                 Key: STORM-67
>                 URL: https://issues.apache.org/jira/browse/STORM-67
>             Project: Apache Storm
>          Issue Type: New Feature
>            Reporter: James Xu
>            Assignee: Shyam Rajendran
>              Labels: newbie
>
> https://github.com/nathanmarz/storm/issues/343
> This would be useful in case you want to take special action in the spout 
> like drop messages
> -----------------
> Discmt: Hi, I'd like to try and take a crack at this if it's still relevant. 
> I'm not exactly sure what it's asking for though. It seems to me an 
> implementation for knowing how many pending messages there are for a spout 
> depends on where the spout is getting it's information from, which makes me 
> sure I'm missing something.
> -----------------
> revans2: The spout code in backtype/storm/daemon/executor.clj is already 
> keeping track of the pending tuples if acking is enabled. If acking is 
> disabled nothing is pending.
> defmethod mk-threads :spout [executor-data task-datas]
> defines pending as a RotatingMap which maps all of the storm internal tuple 
> ids to the message id objects passed in by the spout when it first emitted 
> the tuple. The hardest part should be getting pending to a place where the 
> ISpoutOutputCollector implementation or where ever the API is, can get access 
> to it.
> -----------------
> ptgoetz: @Discmt Yes, this is still relevant and would be nice to have.
> The Storm framework asks spouts for tuples by calling the nextTuple() method 
> and keeps track of the tuple tree internally. The underlying data source does 
> not come into play.
> As implied by @revans2, one approach would be to add a method to 
> ISpoutOutputCollector such as getPendingCount() that would allow spout 
> implementations to query for the pending count (probably returning -1 if 
> acking is disabled). The tricky part will likely be bridging the gap between 
> executor.clj and the ISpoutOutputCollector implementation(s). I haven't dug 
> very deeply into the code, so off-hand I don't know how hard that would be. A 
> quick search of the code for TOPOLOGY_MAX_PENDING should point you to some of 
> the touch points.
> Also keep in mind the dual meaning of TOPOLOGY_MAX_PENDING. In a standard 
> storm topology it represents the maximum number of outstanding tuples. In a 
> trident topology it represents the maximum number of outstanding batches.
> -----------------
> Discmt: Hey guys. I've been taking time to look into it, and I feel like I 
> might have an understanding of what exactly it is I need to do. If what 
> @revans2 said is true, and all pending messages are kept within that 
> RotatingMap then this should be somewhat straightforward. I am trying to 
> compile my own storm.jar file right now but I haven't figured how. I tried 
> using build_release.sh in the bin file, but I had no luck. I also tried using 
> lein jar
> -----------------
> xumingming: try the following:
> lein sub install
> lein install
> after these commands are executed, there should be a jar file named 
> storm-xxx.jar in $STORM_HOME/target/.
> -----------------
> Discmt: @xumingming . Thanks for the advice. I found that I had Leiningen 1, 
> but the minimum for is Leiningen 2.
> -----------------
> xumingming: yeah, storm requires lein 2 to build: 
> https://github.com/nathanmarz/storm/blob/master/project.clj#L14
> -----------------
> Discmt: Hi guys. I got my development environment squared away and I can 
> properly build releases now. I use the build_release.sh script. I tried 
> making a change the way @ptgoetz and @revans2 had suggested by adding a 
> method to the output collector to return the pending count. I have some 
> questions about it.
> I noticed most of the collector implementations rely on a delegate, or 
> mediator, which I'm assuming is defined here: 
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L504-515.
>  So if I make a add a method to get the size of pending, defined here 
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L408-414,
>  like so:
>                  (SpoutOutputCollector.
>                   (reify ISpoutOutputCollector
>                     (^int getPendingCount[this] 
>                       (.size pending)
>                       )
>                     (^List emit [this ^String stream-id ^List tuple ^Object 
> message-id]
>                       (send-spout-msg stream-id tuple message-id nil)
>                       )
>                     (^void emitDirect [this ^int out-task-id ^String stream-id
>                                        ^List tuple ^Object message-id]
>                       (send-spout-msg stream-id tuple message-id out-task-id)
>                       )
>                     (reportError [this error]
>                       (report-error error)
>                       )))))
> I should be good right? Aside from the collectors of the trident spouts which 
> may take more research.
> Just so I'm clear, messages are considered pending if they have left the 
> spout and are waiting to be "fully processed", as defined here: 
> https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing ?
> My last question is: does anyone have any suggestions on what would be a good 
> way to test this? i.e. "What kind of topology/scenario should I run".
> -----------------
> revans2: The code you pasted above is creating a new SoputOputCollector 
> instance wrapping an ISpoutOutputCollector instance. In order for any java 
> spout to actually get access to the getPendingCount method, you will need to 
> modify ISpoutOutputCollector to have that method defined in it, and also 
> update SpoutOutputCollector, etc to also implement that method delegating to 
> the ISpoutOutputCollector instance.
> You are correct about the definition of pending.
> Testing is a bit more difficult. You could have a spout that sends out 
> messages to a bolt that does not process them. You could then verify that 
> each time you send out a message the pending count goes up by 1. But to fully 
> test it would would need to have some coordination between the spout and the 
> bolt. This is not impossible but you may need to use global values to do so.
> -----------------
> Discmt: @revans2 I got the interface implementation covered. Thank you for 
> the testing suggestion, and hints on how to perform it. It's incredibly 
> helpful. I think that's a good idea, and I'm going to try and do that to test 
> it out.
> -----------------
> nathanmarz: There's testing infrastructure already built that can do this 
> kind of tracking. The name in the testing code is "tracked topologies". I'm 
> traveling right now so can't give a link but you should be able to find it. 
> It's used quite a bit in the tests of the acking system.
> -----------------
> Discmt: @nathanmarz This is also good news. I'll take a look and use what I 
> can there to speed up the process for me.
> -----------------
> Discmt: Hi, I am asking if someone can explain to me how these tracked 
> topologies work.
> Particularly I'm confused about these lines here:
> https://github.com/nathanmarz/storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L224-245
> Looking in integration_test.clj under the function test-acking on line 219 
> one of the spouts is told to feed a tuple(.feed feeder1 [1]). Then a 
> tracked-wait is called on the topology being tracked to wait for one tuple to 
> be emitted from the spout. Afterwards the checker checks whether or not 0 
> tuples have been acknowledged, and this what I expect is that one 1 tuple 
> would have been acked because it was emitted from the spout. However, this is 
> not the case. Then later on there checker1 is called again expecting there to 
> be one tuple, but another one was not fed. Furthermore, feeder2, a second 
> spout, was told to feed a tuple, and as expected one was already there when 
> checker2 checked for one ack, but this behavior is not the same as the first 
> checker.
> -----------------
> nathanmarz: tracked-wait waits until the entire tree of processing of the 
> tuple has finished. The "checker1" function checks how many tuples on the 
> spout have been acked since last time it was called. The topology is set up 
> so that acknowledgement of tuple trees is delayed until bolts have received 
> multiple tuples. That's why nothing is acked on the spout after the first 
> tuple is emitted, but it is when the second tuple is emitted. If you look at 
> the logic of branching-bolt and agg-bolt you'll see why this is the case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to