Yes, I believe so. 2017-08-16 0:34 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>:
> Awesome, thanks, this makes sense. So here https://github.com/ > apache/storm/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/ > storm-client/src/jvm/org/apache/storm/executor/spout/ > SpoutExecutor.java#L141 it looks like all acks and fails are processed > before nextTuple is called. Is that correct? > > On Tue, Aug 15, 2017 at 4:33 PM, Stig Rohde Døssing <s...@apache.org> > wrote: > >> Sure. When the supervisor starts up a worker JVM, the worker process >> boots up its set of executors here https://github.com/apache/stor >> m/blob/7f33447477dfbf581e9b46feb27c362cc170dc56/storm- >> client/src/jvm/org/apache/storm/daemon/worker/Worker.java#L202 (note the >> main method at the bottom of the file, this is the entry point for worker >> processes). The loop calling SpoutExecutor.call is here >> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2 >> d3da3388b24d13db0/storm-client/src/jvm/org/apache/ >> storm/executor/Executor.java#L239, which basically just sets up a java >> Thread to keep calling the call method until a crash or interrupt happens. >> >> The acks or fails are handled as part of call here >> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2 >> d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/ >> SpoutExecutor.java#L141. That statement pulls messages off the >> executor's message queue, and calls back to >> https://github.com/apache/storm/blob/90ca7fa0c8e73a1884c70e2 >> d3da3388b24d13db0/storm-client/src/jvm/org/apache/storm/executor/spout/ >> SpoutExecutor.java#L191, which is where acks and fails are handled. Once >> the messages are handled, control returns to the call method. >> >> This might be helpful as a reference too http://storm.apache.org/releas >> es/2.0.0-SNAPSHOT/Understanding-the-parallelism-of-a-Storm-topology.html >> >> 2017-08-15 22:02 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>: >> >>> Thanks stig, this is very helpful. So that call function gets called in >>> a loop from somewhere? And when there is an ack or fail does do those get >>> handled instead of call? Would you be able to point me toward the source >>> for that as well? Just trying to understand how things work. >>> >>> Thanks again! >>> >>> >>> On Aug 15, 2017, at 14:48, Stig Rohde Døssing <s...@apache.org> wrote: >>> >>> Sure, take a look at https://github.com/apache/stor >>> m/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client >>> /src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L140. This >>> function is called repeatedly on spouts to emit new tuples. The wait >>> strategy is used in L175 when a call to nextTuple doesn't emit anything. >>> The wait strategy is instantiated here https://github.com/apache/stor >>> m/blob/90ca7fa0c8e73a1884c70e2d3da3388b24d13db0/storm-client >>> /src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L73. Note >>> that this is linking to the current master code, the 1.x code is Clojure >>> code instead. The equivalent on 1.x is here >>> https://github.com/apache/storm/blob/v1.1.1/storm-core/src/c >>> lj/org/apache/storm/daemon/executor.clj#L659. >>> >>> I believe you have to do it in code for Java-based topology >>> configurations, but you should take a look at >>> http://storm.apache.org/releases/2.0.0-SNAPSHOT/flux.html, which allows >>> you to specify topology configuration as yaml. >>> >>> 2017-08-15 20:36 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>: >>> >>>> Also there's no config file that can do something similar right? It has >>>> to be done in the code? >>>> >>>> >>>> On Aug 15, 2017, at 14:31, Mahak Goel <mahakgoe...@gmail.com> wrote: >>>> >>>> Thanks stig, that worked for me! >>>> >>>> Another question, how does storm internally handle this time out? Is >>>> there some source code you can point me to? >>>> >>>> Sent from my iPhone >>>> >>>> On Aug 15, 2017, at 12:15, Stig Rohde Døssing <s...@apache.org> wrote: >>>> >>>> I think you need to give the FQCN for SleepSpoutWaitStrategy instead of >>>> an instance, since the config must be serializable to JSON, a little >>>> surprised you don't get an error when you submit that topology. If you're >>>> using the default wait strategy, you can just leave out the >>>> TOPOLOGY_SPOUT_WAIT_STRATEGY part. >>>> >>>> Here's what works for me (based on the word count topology in >>>> storm-starter): >>>> >>>> builder.setSpout("spout", new RandomSentenceSpout(), 5) >>>> .addConfiguration(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, new >>>> TestWait().getClass().getName()) >>>> >>>> .addConfiguration(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, >>>> 60_000); >>>> >>>> where TestWait is just an inner class like this (purely so I can print >>>> the configuration, normally I'd just use the built in wait strategy) >>>> >>>> public static final class TestWait extends SleepSpoutWaitStrategy { >>>> >>>> @Override >>>> public void prepare(Map<String, Object> conf) { >>>> super.prepare(conf); >>>> LogManager.getLogger(getClass()).error("The sleep backoff >>>> is {}", conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)); >>>> } >>>> >>>> } >>>> >>>> When I run the topology I get the following in the log: >>>> 2017-08-15 18:11:56.596 o.a.s.s.WordCountTopology$TestWait main >>>> [ERROR] The sleep backoff is 60000 >>>> >>>> 2017-08-15 18:00 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>: >>>> >>>>> In the last line I use addConfigurations >>>>> >>>>> Sent from my iPhone >>>>> >>>>> On Aug 15, 2017, at 11:59, Mahak Goel <mahakgoe...@gmail.com> wrote: >>>>> >>>>> Hmm okay, that's what I'm trying to do but maybe I'm doing it wrong. >>>>> >>>>> >>>>> Config config = new Config(); >>>>> SleepSpoutWaitStrategy strategy = new SleepSpoutWaitStrategy(); >>>>> config.put(org.apache.storm.Config.TOPOLOGY_SPOUT_WAIT_STRATEGY, >>>>> strategy); >>>>> config.put(org.apache.storm.Config.TOPOLOGY_SLEEP_SPOUT_WAIT >>>>> _STRATEGY_TIME_MS, 10); >>>>> builder.setSpout(...).addConfiguration(config); >>>>> >>>>> >>>>> >>>>> Sent from my iPhone >>>>> On Aug 15, 2017, at 11:51, Stig Rohde Døssing <s...@apache.org> wrote: >>>>> >>>>> I think I might have misread the code. It looks like the method I >>>>> linked does the opposite of what I thought, and removes only the >>>>> configuration that is not listed in the link. I would expect using >>>>> SpoutDeclarer.addConfiguration to work then. >>>>> >>>>> 2017-08-15 17:36 GMT+02:00 Mahak Goel <mahakgoe...@gmail.com>: >>>>> >>>>>> Text from post. >>>>>> >>>>>> 2. Spout wait strategies: There's two situations in which a spout >>>>>> needs to wait. The first is when the max spout pending limit is reached. >>>>>> The second is when nothing is emitted from nextTuple. Previously, Storm >>>>>> would just have that spout sit in a busy loop in those cases. What Storm >>>>>> does in those situations is now pluggable, and the default is now for the >>>>>> spout to sleep for 1 ms. This will cause the spout to use dramatically >>>>>> less >>>>>> CPU when it hits those cases, and it also obviates the need for spouts to >>>>>> do any sleeping in their implementation to be "polite". The wait strategy >>>>>> can be configured with TOPOLOGY_SPOUT_WAIT_STRATEGY and can be >>>>>> configured on a spout by spout basis. The interface to implement for a >>>>>> wait >>>>>> strategy is backtype.storm.spout.ISpoutWaitStrategy >>>>>> >>>>>> >>>>>> >>>>>> On Aug 15, 2017, at 11:34, Mahak Goel <mahakgoe...@gmail.com> wrote: >>>>>> >>>>>> I tried adding TOPOLOGY_SPOUT_WAIT_STRATEGY and >>>>>> TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS in the spouts config but >>>>>> that didn't seem to have an effect. >>>>>> >>>>>> >>>>>> On Aug 15, 2017, at 11:28, Mahak Goel <mahakgoe...@gmail.com> wrote: >>>>>> >>>>>> Hi Stig, >>>>>> >>>>>> Thank you. However it looks like from this post there is a way to do >>>>>> it on a per spout basis. >>>>>> https://groups.google.com/forum/m/#!search/Storm$200.8.1$20r >>>>>> eleased/storm-user/hVbXtBdCkQo >>>>>> >>>>>> Do you or does anyone else know if this is still a possibility? If >>>>>> so, how do I do it? >>>>>> >>>>>> >>>>>> On Aug 15, 2017, at 11:14, Stig Rohde Døssing <s...@apache.org> >>>>>> wrote: >>>>>> >>>>>> Hi Mahak, >>>>>> >>>>>> I haven't checked in any detail, but I suspect there isn't. I'd have >>>>>> said you could set the configuration for the spout via the SpoutDeclarer >>>>>> addConfiguration methods when declaring the spout, but it looks like the >>>>>> wait strategy and backoff are both removed from the component >>>>>> configuration, and only read from the topology level configuration >>>>>> https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f3 >>>>>> 3ab64e200345333e4/storm-client/src/jvm/org/apache/storm/exec >>>>>> utor/Executor.java#L431. >>>>>> >>>>>> 2017-08-15 16:45 GMT+02:00 Brian Taylor < >>>>>> br...@resolvingarchitecture.com>: >>>>>> >>>>>>> Unsubscribe >>>>>>> >>>>>>> Sent from BlueMail <http://www.bluemail.me/r?b=9660> >>>>>>> On Aug 15, 2017, at 10:34 AM, Mahak Goel <mahakgoe...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I know I can configure a sleep wait strategy in the defaults.yaml and >>>>>>>> that will apply to all spouts in the topology. Is there a way to do >>>>>>>> this on a spout by spout basis? That is, is there a way to configure >>>>>>>> different times for different spouts? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >> >