There is no such thing for Bolts.

The call to Spout.ack(...) happens after Storm retrieved all acks of all
(transitively) anchored tuples.

Let's say you have spout -> bolt1 -> bolt2

Spout emit t1 which is processed by bolt1.
bolt1 emits t2 (with anchor t1) and acks t1.
=> there will be no call to Spout.ack(...) yet, because t2 is pending
(ie, was not acked).
After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens.


-Matthias



On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote:
> Thank you Derek for your reply. 
> 
> I figured out the error and it was in my code (i was not acking all
> tuples properly). I have another question:
> 
> I see that the BaseRichSpout has a callback function called ack(Object
> msgId) which is called when a tuple gets acknowledged. Is there similar
> functionality for Bolts? I see that the BaseRichBolt does not have one.
> 
> Thanks,
> Nick
> 
> On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <[email protected]
> <mailto:[email protected]>> wrote:
> 
>     The metrics used on the UI are aggregated in chunks.
> 
>     It could very well be that your code is working perfectly fine, and
>     there is a threshold of emits/acks/fails that needs to be met before
>     the numbers show up on the UI.
> 
>     Often I will see 0 on the UI until, for example, the number of emits
>     reaches 20.  And very often the numbers will increment by 20s too.--
>     Derek
> 
> 
>     ________________________________
>     From: Nick R. Katsipoulakis <[email protected]
>     <mailto:[email protected]>>
>     To: [email protected] <mailto:[email protected]>
>     Sent: Wednesday, September 9, 2015 7:52 AM
>     Subject: Re: UIs ack statistics are not updated
> 
> 
> 
>     Hello Javier and thank you for your reply.
> 
>     I have a question about the Tuple ids. Do they have to be unique? I
>     am asking because I have many spouts and they might emit identical
>     tuples in the topology.
> 
>     Also, do I have to ack a tuple only in the last bolt that processes
>     it, so that the tuple tree created is complete? Or, do I have to ack
>     each received tuple on every bolt?
> 
>     Thanks,
>     Nick
> 
> 
> 
> 
>     On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <[email protected]
>     <mailto:[email protected]>> wrote:
> 
>     If I am reading your code correctly, it seems you're emitting from
>     the spout without id - therefore, your acking efforts are not being
>     used. You need to do something like:
>     >Object id= <anything you like>;
>     >_collector.emit(id,tuple);
>     >Regards,
>     >Javier
>     >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis"
>     <[email protected] <mailto:[email protected]>> wrote:
>     >
>     >Hello all,
>     >>
>     >>
>     >>I am running a topology for bench marking my cluster. In it, I
>     anchor tuples and I acknowledge them for exactly-once processing and
>     in order to see the complete latency metric on the Storm UI.
>     However, the "Complete Latency" and the "Acked" metric values for my
>     spouts remain 0 and I guess that this translates to not being
>     reported properly.
>     >>
>     >>
>     >>My Topology's code is really simple and consists of the following
>     three classes:
>     >>
>     >>
>     >>public static class TestWordSpout extends BaseRichSpout {
>     >>
>     >>SpoutOutputCollector _collector;
>     >>
>     >>public void open(@SuppressWarnings("rawtypes") Map conf,
>     TopologyContext context, SpoutOutputCollector collector) {
>     >>_collector = collector;
>     >>}
>     >>public void nextTuple() {
>     >>final String[] words = new String[] {"nathan", "mike", "jackson",
>     "golda", "bertels"};
>     >>final Random rand = new Random();
>     >>final String word = words[rand.nextInt(words.length)];
>     >>Values tuple = new Values();
>     >>tuple.add((new Long(System.currentTimeMillis())).toString());
>     >>tuple.add(word);
>     >>_collector.emit(tuple);
>     >>}
>     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     >>String[] schema = { "timestamp", "word" };
>     >>declarer.declare(new Fields(schema));
>     >>}
>     >>}
>     >>My intermediate bolts code is the following:
>     >>public static class IntermediateBolt extends BaseRichBolt {
>     >>
>     >>OutputCollector _collector;
>     >>
>     >>@Override
>     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
>     TopologyContext context, OutputCollector collector) {
>     >>_collector = collector;
>     >>}
>     >>@Override
>     >>public void execute(Tuple tuple) {
>     >>Values v = new Values();
>     >>v.add(tuple.getString(0));
>     >>v.add(tuple.getString(1));
>     >>_collector.emit(tuple, v);
>     >>}
>     >>@Override
>     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     >>String[] schema = { "timestamp", "word" };
>     >>declarer.declare(new Fields(schema));
>     >>}
>     >>}
>     >>And finally, my sink bolts (the last bolts in my topology) are the
>     following:
>     >>
>     >>
>     >>public static class SinkBolt extends BaseRichBolt {
>     >>
>     >>OutputCollector _collector;
>     >>
>     >>@Override
>     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
>     TopologyContext context, OutputCollector collector) {
>     >>_collector = collector;
>     >>}
>     >>@Override
>     >>public void execute(Tuple tuple) {
>     >>_collector.ack(tuple);
>     >>}
>     >>@Override
>     >>public void declareOutputFields(OutputFieldsDeclarer
>     outputFieldsDeclarer) {
>     >>String[] schema = {"timestamp", "word"};
>     >>outputFieldsDeclarer.declare(new Fields(schema));
>     >>}
>     >>}
>     >>So, I just have a 3-level topology (spout, intermediate-bolt,
>     sink-bolt) just to measure my cluster. However, as I mentioned
>     above, in the UI the "Complete latency" and the "Acked" metrics are
>     not updated for my spouts. Am I doing something wrong? Please, pay
>     attention that I ack a tuple only at the SinkBolt. Is this the
>     reason that I my metrics are not updated?
>     >>
>     >>
>     >>Thanks,
>     >>Nick
>     >>
>     >>
> 
> 
>     --
> 
>     Nikolaos Romanos Katsipoulakis,
>     University of Pittsburgh, PhD candidate
> 
> 
> 
> 
> -- 
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to