FYI: Issue STORM-1742 <https://issues.apache.org/jira/browse/STORM-1742> and related pull request (WIP) #1379 <https://github.com/apache/storm/pull/1379> are available.
I've also done with functional test so that you can easily see what I'm claiming. Thanks, Jungtaek Lim (HeartSaVioR) 2016년 4월 29일 (금) 오후 4:59, Jungtaek Lim <[email protected]>님이 작성: > sorry some correction which may confuse someone: > > With this circumstance there's no issue to keep it as is, since *users > normally don't implemented ack() / fail() as long blocking method*. > > 2016년 4월 29일 (금) 오후 4:56, Jungtaek Lim <[email protected]>님이 작성: > >> Cody, >> Thanks for join the conversation. >> >> If my understanding is right, the way JStorm handles complete latency is >> same to what Apache Storm currently does. >> Please refer >> https://github.com/apache/storm/blob/master/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L545 >> >> What I really want to address is when/which component will decide >> complete timestamp. >> >> If my understanding is right, JStorm separates the threads in Spout which >> one is responsible for outgoing tuples, and other one is responsible for >> receiving / handling incoming tuple. With this circumstance there's no >> issue to keep it as is, since normally ack() / fail() are not implemented >> as long blocking method. >> >> But many users are implementing nextTuple() to sleep long amount of time >> to throttle theirselves (yes, I was one of them) and that decision makes >> tuples from Acker also waiting amount of time. There're some exceptions: >> throttling is on (via backpressure), count of pending tuples are over max >> spout pending. >> >> So I guess same issue persists on JStorm with single thread mode Spout. >> >> Please let me know if I'm missing something. >> >> Thanks! >> Jungtaek Lim (HeartSaVioR) >> >> >> >> 2016년 4월 29일 (금) 오후 4:32, Cody Innowhere <[email protected]>님이 작성: >> >>> What we do in JStorm is to set timestamp of a tuple before a spout sends >>> it >>> to downstream bolts, then in spout's ack/fail method, we get current >>> timestamp, by subtracting the original ts, we get process latency, note >>> this delta time includes network cost from spouts to bolts, ser/deser >>> time, bolt process time, network cost between acker to the original >>> spout, >>> i.e., it's almost the time of tuple's life cycle. >>> >>> I'm adding this on porting executor.clj. In such a way, we don't need to >>> care about time sync problem. >>> >>> On Fri, Apr 29, 2016 at 11:18 AM, Jungtaek Lim <[email protected]> >>> wrote: >>> >>> > One way to confirm my assumption is valid, we could use sojourn_time_ms >>> > currently provided to queue metrics. >>> > >>> > We could see sojourn_time_ms in '__receive' metrics of Spout component >>> to >>> > verify how long messages from Acker wait from receive queue in Spout. >>> > >>> > And we also could estimate "waiting time in transfer queue in Spout" by >>> > seeing sojourn_time_ms in '__send' metrics of Spout component, and >>> estimate >>> > "waiting time for ACK_INIT in receive queue in Acker" by seeing >>> > sojourn_time_ms in '__receive' metrics of Acker component. >>> > >>> > Since I don't have clusters/topologies for normal use case I'm not sure >>> > what normally the values are, but at least, metrics from >>> > ThroughtputVsLatency, sojourn_time_ms in '__send' of Spout is often >>> close >>> > to 0, and sojourn_time_ms in '__receive' of Acker is less than 2ms. >>> > If message transfer latency of ACK_INIT message is tiny, sum of >>> latencies >>> > on option 2 would be also tiny, maybe less than 5ms (just an >>> assumption). >>> > >>> > I really like to see those metrics values (including sojourn_time_ms in >>> > '__receive' of Bolts) from various live topologies which handles >>> normal use >>> > cases to make my assumption solid. Please share if you're logging those >>> > metrics. >>> > >>> > I'll try to go on 2) first, but still open to any ideas / opinions / >>> > objections. >>> > >>> > Thanks, >>> > Jungtaek Lim (HeartSaVioR) >>> > >>> > 2016년 4월 29일 (금) 오전 9:38, Jungtaek Lim <[email protected]>님이 작성: >>> > >>> > > Roshan, >>> > > >>> > > Thanks for sharing your thought. >>> > > About your thoughts I'm in favor of 1), that's what my sketch is >>> trying >>> > to >>> > > achieve. >>> > > >>> > > If we agree to go on 1), IMO the options I stated are clear. Let me >>> > > elaborate more. >>> > > >>> > > Root tuple has been made from "Spout" and on definition of 'complete >>> > > latency' tuple tree is considered as complete from "Acker". Since >>> start >>> > > point and end point are occurring different components, we should >>> > > tolerate either "latency of handling ACK_INIT between Spout and >>> Acker" >>> > > (which changes start point to Acker) or "time variation between >>> machine >>> > > which Spout is running on and machine which Acker is running on". I >>> think >>> > > there's no way to avoid both of two, so we should just choose which >>> is >>> > > smaller to be easier to ignore. I agree it could feel tricky for us. >>> > > >>> > > I found some answers / articles claiming there could be >>> sub-millisecond >>> > > precision within same LAN if machines are syncing from same ntp >>> server, >>> > and >>> > > other articles claiming hundreds of millisecond precision which is >>> not >>> > > acceptable to tolerate. >>> > > I guess Storm doesn't require machines to be synched with same time, >>> so >>> > it >>> > > will be new requirement to set up cluster. >>> > > >>> > > And latency of handling ACK_INIT between Spout and Acker is up to >>> > hardware >>> > > cluster configurations, but normally we place machines to same rack >>> or >>> > same >>> > > switch, or at least group to same LAN which shows low latency. >>> > > So it's up to "waiting time in transfer queue in Spout" and "waiting >>> time >>> > > for ACK_INIT in receive queue in Acker". But if we don't want to get >>> into >>> > > too deeply, I guess this would be fine for normal situation, since >>> Acker >>> > is >>> > > lightweight and should be keep up the traffic. >>> > > >>> > > - Jungtaek Lim (HeartSaVioR) >>> > > >>> > > >>> > > 2016년 4월 29일 (금) 오전 5:41, Roshan Naik <[email protected]>님이 작성: >>> > > >>> > >> IMO, avoiding the time variation on machines makes total sense. But >>> I >>> > feel >>> > >> that this is a tricky question. >>> > >> >>> > >> >>> > >> Couple more thoughts: >>> > >> >>> > >> 1) As per >>> > >> >>> > >> >>> > >>> http://storm.apache.org/releases/current/Guaranteeing-message-processing.ht >>> > >> ml >>> > >> < >>> > >>> http://storm.apache.org/releases/current/Guaranteeing-message-processing.html >>> > > >>> > >> >>> > >> "Storm can detect when the tree of tuples is fully processed and >>> can ack >>> > >> or fail the spout tuple appropriately." >>> > >> >>> > >> >>> > >> That seems to indicate that when the ACKer has received all the >>> > necessary >>> > >> acks, then it considers the tuple fully processed. If we go by >>> that, and >>> > >> we define complete latency as the time taken to fully process a >>> tuple, >>> > >> then it is not necessary to include the time it takes for the ACK >>> to be >>> > >> delivered to spout. >>> > >> >>> > >> >>> > >> 2) If you include the time it takes to deliver the ACK to the spout, >>> > then >>> > >> we also need to wonder if we should include the time that the spout >>> > takes >>> > >> to process the ACK() call. I am unclear if the spout.ack() throws an >>> > >> exception what that means to the idea of Œfully processed¹. Here >>> you can >>> > >> compute delta either immediately before OR immediately after the >>> ACK() >>> > is >>> > >> invoked on the spout >>> > >> >>> > >> >>> > >> The benefit of including spout¹s ACK() processing time, is that any >>> > >> optimizations/inefficiencies in the spout's ACK() implementation >>> will be >>> > >> detectable. >>> > >> >>> > >> I wonder if we should split this into two different metricsŠ >>> > >> >>> > >> - ³delivery latency² (which ends once the ACKer receives the last >>> ACK >>> > >> from a bolt) and >>> > >> - "complete latency² which includes ACK processing time by spout >>> > >> >>> > >> >>> > >> -roshan >>> > >> >>> > >> >>> > >> >>> > >> On 4/28/16, 8:59 AM, "Jungtaek Lim" <[email protected]> wrote: >>> > >> >>> > >> >Hi devs, >>> > >> > >>> > >> >While thinking about metrics improvements, I doubt how many users >>> know >>> > >> >that >>> > >> >what 'exactly' is complete latency. In fact, it's somewhat >>> complicated >>> > >> >because additional waiting time could be added to complete latency >>> > >> because >>> > >> >of single-thread model event loop of spout. >>> > >> > >>> > >> >Long running nextTuple() / ack() / fail() can affect complete >>> latency >>> > but >>> > >> >it's behind the scene. No latency information provided, and someone >>> > even >>> > >> >didn't know about this characteristic. Moreover, calling >>> nextTuple() >>> > >> could >>> > >> >be skipped due to max spout waiting, which will make us harder to >>> guess >>> > >> >when avg. latency of nextTuple() will be provided. >>> > >> > >>> > >> >I think separation of threads (tuple handler to separate thread, as >>> > >> JStorm >>> > >> >provides) would resolve the gap, but it requires our spout logic >>> to be >>> > >> >thread-safe, so I'd like to find workaround first. >>> > >> > >>> > >> >My sketched idea is let Acker decides end time for root tuple. >>> > >> >There're two subsequent ways to decide start time for root tuple, >>> > >> > >>> > >> >1. when Spout about to emit ACK_INIT to Acker (in other words, >>> keep it >>> > as >>> > >> >it is) >>> > >> > - Acker sends ack / fail message to Spout with timestamp, and >>> Spout >>> > >> >calculates time delta >>> > >> > - pros. : It's most accurate way since it respects the >>> definition of >>> > >> >'complete latency'. >>> > >> > - cons. : The sync of machine time between machines are very >>> > important. >>> > >> >Milliseconds of precision would be required. >>> > >> >2. when Acker receives ACK_INIT from Spout >>> > >> > - Acker calculates time delta itself, and sends ack / fail >>> message to >>> > >> >Spout with time delta >>> > >> > - pros. : No requirement to sync the time between servers so >>> > strictly. >>> > >> > - cons. : It doesn't contain the latency to send / receive >>> ACK_INIT >>> > >> >between Spout and Acker. >>> > >> > >>> > >> >Sure we could leave it as is if we decide it doesn't hurt much. >>> > >> > >>> > >> >What do you think? >>> > >> > >>> > >> >Thanks, >>> > >> >Jungtaek Lim (HeartSaVioR) >>> > >> >>> > >> >>> > >>> >>
