There is no such thing for Bolts. The call to Spout.ack(...) happens after Storm retrieved all acks of all (transitively) anchored tuples.
Let's say you have spout -> bolt1 -> bolt2 Spout emit t1 which is processed by bolt1. bolt1 emits t2 (with anchor t1) and acks t1. => there will be no call to Spout.ack(...) yet, because t2 is pending (ie, was not acked). After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens. -Matthias On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote: > Thank you Derek for your reply. > > I figured out the error and it was in my code (i was not acking all > tuples properly). I have another question: > > I see that the BaseRichSpout has a callback function called ack(Object > msgId) which is called when a tuple gets acknowledged. Is there similar > functionality for Bolts? I see that the BaseRichBolt does not have one. > > Thanks, > Nick > > On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <[email protected] > <mailto:[email protected]>> wrote: > > The metrics used on the UI are aggregated in chunks. > > It could very well be that your code is working perfectly fine, and > there is a threshold of emits/acks/fails that needs to be met before > the numbers show up on the UI. > > Often I will see 0 on the UI until, for example, the number of emits > reaches 20. And very often the numbers will increment by 20s too.-- > Derek > > > ________________________________ > From: Nick R. Katsipoulakis <[email protected] > <mailto:[email protected]>> > To: [email protected] <mailto:[email protected]> > Sent: Wednesday, September 9, 2015 7:52 AM > Subject: Re: UIs ack statistics are not updated > > > > Hello Javier and thank you for your reply. > > I have a question about the Tuple ids. Do they have to be unique? I > am asking because I have many spouts and they might emit identical > tuples in the topology. > > Also, do I have to ack a tuple only in the last bolt that processes > it, so that the tuple tree created is complete? Or, do I have to ack > each received tuple on every bolt? > > Thanks, > Nick > > > > > On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <[email protected] > <mailto:[email protected]>> wrote: > > If I am reading your code correctly, it seems you're emitting from > the spout without id - therefore, your acking efforts are not being > used. You need to do something like: > >Object id= <anything you like>; > >_collector.emit(id,tuple); > >Regards, > >Javier > >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" > <[email protected] <mailto:[email protected]>> wrote: > > > >Hello all, > >> > >> > >>I am running a topology for bench marking my cluster. In it, I > anchor tuples and I acknowledge them for exactly-once processing and > in order to see the complete latency metric on the Storm UI. > However, the "Complete Latency" and the "Acked" metric values for my > spouts remain 0 and I guess that this translates to not being > reported properly. > >> > >> > >>My Topology's code is really simple and consists of the following > three classes: > >> > >> > >>public static class TestWordSpout extends BaseRichSpout { > >> > >>SpoutOutputCollector _collector; > >> > >>public void open(@SuppressWarnings("rawtypes") Map conf, > TopologyContext context, SpoutOutputCollector collector) { > >>_collector = collector; > >>} > >>public void nextTuple() { > >>final String[] words = new String[] {"nathan", "mike", "jackson", > "golda", "bertels"}; > >>final Random rand = new Random(); > >>final String word = words[rand.nextInt(words.length)]; > >>Values tuple = new Values(); > >>tuple.add((new Long(System.currentTimeMillis())).toString()); > >>tuple.add(word); > >>_collector.emit(tuple); > >>} > >>public void declareOutputFields(OutputFieldsDeclarer declarer) { > >>String[] schema = { "timestamp", "word" }; > >>declarer.declare(new Fields(schema)); > >>} > >>} > >>My intermediate bolts code is the following: > >>public static class IntermediateBolt extends BaseRichBolt { > >> > >>OutputCollector _collector; > >> > >>@Override > >>public void prepare(@SuppressWarnings("rawtypes") Map conf, > TopologyContext context, OutputCollector collector) { > >>_collector = collector; > >>} > >>@Override > >>public void execute(Tuple tuple) { > >>Values v = new Values(); > >>v.add(tuple.getString(0)); > >>v.add(tuple.getString(1)); > >>_collector.emit(tuple, v); > >>} > >>@Override > >>public void declareOutputFields(OutputFieldsDeclarer declarer) { > >>String[] schema = { "timestamp", "word" }; > >>declarer.declare(new Fields(schema)); > >>} > >>} > >>And finally, my sink bolts (the last bolts in my topology) are the > following: > >> > >> > >>public static class SinkBolt extends BaseRichBolt { > >> > >>OutputCollector _collector; > >> > >>@Override > >>public void prepare(@SuppressWarnings("rawtypes") Map conf, > TopologyContext context, OutputCollector collector) { > >>_collector = collector; > >>} > >>@Override > >>public void execute(Tuple tuple) { > >>_collector.ack(tuple); > >>} > >>@Override > >>public void declareOutputFields(OutputFieldsDeclarer > outputFieldsDeclarer) { > >>String[] schema = {"timestamp", "word"}; > >>outputFieldsDeclarer.declare(new Fields(schema)); > >>} > >>} > >>So, I just have a 3-level topology (spout, intermediate-bolt, > sink-bolt) just to measure my cluster. However, as I mentioned > above, in the UI the "Complete latency" and the "Acked" metrics are > not updated for my spouts. Am I doing something wrong? Please, pay > attention that I ack a tuple only at the SinkBolt. Is this the > reason that I my metrics are not updated? > >> > >> > >>Thanks, > >>Nick > >> > >> > > > -- > > Nikolaos Romanos Katsipoulakis, > University of Pittsburgh, PhD candidate > > > > > -- > Nikolaos Romanos Katsipoulakis, > University of Pittsburgh, PhD candidate
signature.asc
Description: OpenPGP digital signature
