Thank you Ambud, very comprehensive answer! On Wed, Sep 14, 2016 at 9:55 PM, Ambud Sharma <asharma52...@gmail.com> wrote:
> Two things here extending what Ravi talked about: > > 1. You fail tuples either explicitly or they timeout as an indicator of a > recoverable issue in the topology. > > If the error is not recoverable don't fail the tuple, ack it and forward > the error to another bolt so you can record it somewhere for further > investigation like kafka (we have a topic in kafka for this) > > 2. Real-time processing means you have to worry about latencies at the > nano second level at times, this means fail fast strategy must be used. > Point to point failure at the granularity of a single tuple can be > implemented using transactions with size of 1. This will slow down the > topology substantially. You can try an implementation yourself and see. > > The XOR based tuple tree is a genius innovation from Nathan Marz to do > tuple tracking very very fast while using predictable memory. So regardless > of however many hops your tuple has to go through Storm uses 20 bytes to > track it down. > > #################### > > Now about exactly once processing. There is no such this as exactly once > processing unless using transactions with batch size of 1. (Including > trident) > > What topology developers should focus on is idempotent processing! > > What does that mean? Idempotent processing means if your tuple was to > replay the result would not change. So if you are using trident > micro-batching or you wrote your own micro-batching in Storm the net result > is, in case of failures your tuples will replay but your are okay doing > that since your net result will be the same. > > With trident it will not process the next batch until the current one is > processed. Which means the entire batch has to be handled via rollback > transactions (as in you flush to the db at the end of the batch) or better > write to db in an idempotent manner where each tuple has an id such that if > you wrote it again it will just rewrite the same info. > > Most modern data stores have the concept of a key which can be used e.g. > elastic document id, hbase row key, MySQL primary key etc. > > Now how to get UUID of the tuple? > 1. Handle in your application logic if you already know what is a unique > event > 2. Worry from Kafka onwards (we do this) use partition id + offset + event > timestamp (inside the event payload) as the UUID > 3. MD5 the payload of the event (have a risk of collision here depending > on your event volume and application logic) > > For things like unique counting you can use in-memory approach like we did > (Hendrix) or use something like Redis with structures like set and > hhperloglog. > > Thanks, > Ambud > > On Sep 14, 2016 1:38 AM, "Cheney Chen" <tbcql1...@gmail.com> wrote: > >> Thank you guys for the discussion. >> >> What if I want exact-once processing for all nodes (bolts), even when >> failure happens, will Trident be the one? >> >> On Wed, Sep 14, 2016 at 3:49 PM, Ravi Sharma <ping2r...@gmail.com> wrote: >> >>> Hi T.I. >>> Few things why Spout is responsible for replay rather then Various Bolts. >>> >>> 1. ack and fail messages carry only message ID, Usually your spouts >>> generate messaged Id and knows what tuple/message is linked to it(via >>> source i.e. jms etc). If ack or fail happens then Spout can do various >>> things like on ack delete from queue, on fail put in some dead letter >>> queue. intermediate Bolt Wont know what message it sent, unless you >>> implement something of your own. Technically you can put Delete message >>> from JMS in bolts but then your whole topology knows from where you are >>> getting data, what if tommorow you start processing data from JMS, Http >>> rest service, Database and file system etc. >>> >>> 2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times, >>> now what BoltA should do,? Send it to another bolt(say BoltPreA exists >>> between him and spout) or send it to Spout.? >>> If it sends to BoltPreA that means BoltPreA will retry 3 >>> times(just using 3 number consider as N), that means for each try to >>> BoltPreA, BoltA will retry again 3 times, so total 9 retries.(basically >>> total retries will be based on Total bolt from Spout to Failure Bolt TB and >>> total Retries TR, it will be like TR + Power(TR,2) ..... + Power(TR,TB) >>> If you send back from failure from BoltA to Spout then we can >>> argue why not send it to Spout from BoltB, as a framework i shouldnt be >>> looking into if BoltB is really costly or BoltA is really costly. >>> >>> 3. Also failure scenario are suppose to be really really low, and if >>> your database is down(means 100% tuple will fail), then performance wont be >>> your only concern. your concern will be to make sure database comes up and >>> reprocess all failed tuple. >>> >>> 4. Also you will have to take care of retry logic in every Bolt. >>> Currently its only at one place. >>> >>> >>> >>> *There is one thing i am looking forward from Storm is to inform Spout >>> about what kind of failure it was*. i.e. if it was ConnectionTimeout or >>> ReadTimeout etc, that means if i retry it may pass. But say it was null >>> pointer exception(java world) , i know the data which is being expected is >>> not there and my code is not handling that scenario, so either i will have >>> to change code or ask data provider to send that field, but retry wont help >>> me. >>> >>> Currently only way to do is use a outside datastore like Redis, >>> whichever Bolt you fail add a key with mesageId and Exception/error detail >>> in redis before calling fail. and then let Spout read that data from redis >>> with messageId received in onFail call and then spout can decide if i want >>> to retry or not. I would usually Create two wrappers Retry-able Exception >>> and *non* Retry-able Exception, so each bolt can inform whether retry >>> can help or not. Its upto you where you put this decision making logic. >>> >>> >>> >>> Thanks >>> Ravi. >>> >>> >>> >>> >>> >>> >>> On Wed, Sep 14, 2016 at 6:43 AM, Tech Id <tech.login....@gmail.com> >>> wrote: >>> >>>> Thanks Ambud, >>>> >>>> I did read some very good things about acking mechanism in Storm but I >>>> am not sure it explains why point to point checking is expensive. >>>> >>>> Consider the example: Spout--> BoltA--->BoltB. >>>> >>>> If BoltB fails, it will report failure to the acker. >>>> If the acker can ask the Spout to replay, then why can't the acker ask >>>> the parent of BoltB to replay at this point? >>>> I don't think keeping parent of a bolt could be expensive. >>>> >>>> >>>> On a related note, I am a little confused about a statement "When a new >>>> tupletree is born, the spout sends the XORed edge-ids of each tuple >>>> recipient, which the acker records in its pending ledger" in >>>> Acking-framework-implementation.html >>>> <http://storm.apache.org/releases/current/Acking-framework-implementation.html> >>>> . >>>> How does the spout know before hand which bolts would receive the >>>> tuple? Bolts forward tuples to other bolts based on groupings and >>>> dynamically generated fields. How does spout know what fields will be >>>> generated and which bolts will receive the tuples? If it does not know >>>> that, then how does it send the XOR of each tuple recipient in a tuple's >>>> path because each tuple's path will be different (I think, not sure >>>> though). >>>> >>>> >>>> Thx, >>>> T.I. >>>> >>>> >>>> On Tue, Sep 13, 2016 at 6:37 PM, Ambud Sharma <asharma52...@gmail.com> >>>> wrote: >>>> >>>>> Here is a post on it https://bryantsai.com/fault-to >>>>> lerant-message-processing-in-storm/. >>>>> >>>>> Point to point tracking is expensive unless you are using >>>>> transactions. Flume does point to point transfers using transactions. >>>>> >>>>> On Sep 13, 2016 3:27 PM, "Tech Id" <tech.login....@gmail.com> wrote: >>>>> >>>>>> I agree with this statement about code/architecture but in case of >>>>>> some system outages, like one of the end-points (Solr, Couchbase, >>>>>> Elastic-Search etc.) being down temporarily, a very large number of other >>>>>> fully-functional and healthy systems will receive a large number of >>>>>> duplicate replays (especially in heavy throughput topologies). >>>>>> >>>>>> If you can elaborate a little more on the performance cost of >>>>>> tracking tuples or point to a document reflecting the same, that will be >>>>>> of >>>>>> great help. >>>>>> >>>>>> Best, >>>>>> T.I. >>>>>> >>>>>> On Tue, Sep 13, 2016 at 12:26 PM, Hart, James W. <jwh...@seic.com> >>>>>> wrote: >>>>>> >>>>>>> Failures should be very infrequent, if they are not then rethink the >>>>>>> code and architecture. The performance cost of tracking tuples in the >>>>>>> way >>>>>>> that would be required to replay at the failure is large, basically that >>>>>>> method would slow everything way down for very infrequent failures. >>>>>>> >>>>>>> >>>>>>> >>>>>>> *From:* S G [mailto:sg.online.em...@gmail.com] >>>>>>> *Sent:* Tuesday, September 13, 2016 3:17 PM >>>>>>> *To:* user@storm.apache.org >>>>>>> *Subject:* Re: How will storm replay the tuple tree? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I am a little curious to know why we begin at the spout level for >>>>>>> case 1. >>>>>>> >>>>>>> If we replay at the failing bolt's parent level (BoltA in this >>>>>>> case), then it should be more performant due to a decrease in duplicate >>>>>>> processing (as compared to whole tuple tree replays). >>>>>>> >>>>>>> >>>>>>> >>>>>>> If BoltA crashes due to some reason while replaying, only then the >>>>>>> Spout should receive this as a failure and whole tuple tree should be >>>>>>> replayed. >>>>>>> >>>>>>> >>>>>>> >>>>>>> This saving in duplicate processing will be more visible with >>>>>>> several layers of bolts. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I am sure there is a good reason to replay the whole tuple-tree, and >>>>>>> want to know the same. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> SG >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 13, 2016 at 10:22 AM, P. Taylor Goetz <ptgo...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Cheney, >>>>>>> >>>>>>> >>>>>>> >>>>>>> Replays happen at the spout level. So if there is a failure at any >>>>>>> point in the tuple tree (the tuple tree being the anchored emits, >>>>>>> unanchored emits don’t count), the original spout tuple will be >>>>>>> replayed. >>>>>>> So the replayed tuple will traverse the topology again, including >>>>>>> unanchored points. >>>>>>> >>>>>>> >>>>>>> >>>>>>> If an unanchored tuple fails downstream, it will not trigger a >>>>>>> replay. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hope this helps. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Taylor >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sep 13, 2016, at 4:42 AM, Cheney Chen <tbcql1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi there, >>>>>>> >>>>>>> >>>>>>> >>>>>>> We're using storm 1.0.1, and I'm checking through >>>>>>> http://storm.apache.org/releases/1.0.1/Guaranteeing- >>>>>>> message-processing.html >>>>>>> >>>>>>> >>>>>>> >>>>>>> Got questions for below two scenarios. >>>>>>> >>>>>>> Assume topology: S (spout) --> BoltA --> BoltB >>>>>>> >>>>>>> 1. S: anchored emit, BoltA: anchored emit >>>>>>> >>>>>>> Suppose BoltB processing failed w/ ack, what will the replay be, >>>>>>> will it execute both BoltA and BoltB or only failed BoltB processing? >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2. S: anchored emit, BoltA: unanchored emit >>>>>>> >>>>>>> Suppose BoltB processing failed w/ ack, replay will not happen, >>>>>>> correct? >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Regards, >>>>>>> Qili Chen (Cheney) >>>>>>> >>>>>>> E-mail: tbcql1...@gmail.com >>>>>>> MP: (+1) 4086217503 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>> >> >> >> -- >> Regards, >> Qili Chen (Cheney) >> >> E-mail: tbcql1...@gmail.com >> MP: (+1) 4086217503 >> > -- Regards, Qili Chen (Cheney) E-mail: tbcql1...@gmail.com MP: (+1) 4086217503