I am not sure if NimbusClient works well with LocalCluster. My suggestion was based on the assumption, that you run in a real cluster.
There would be LocalCluster.killTopology(); maybe you should use this method instead of NimbusClient.kill(). Using LocalCluster, I usually use the following pattern (that return nicely): > LocalCluster lc = new LocalCluster(); > lc.submitTopology(TOPOLOGY_NAME, config, topology); > > Utils.sleep(runtime); > lc.deactivate(TOPOLOGY_NAME); > > Utils.sleep(10000); > lc.shutdown(); I use deactivate() to send a flush-signal through my topology, too. You could of course replace deactivete() with kill(). Furthermore, instead of "Utils.sleep(runtime)", you could do a wait-loop checking for a global boolean flag "finished" to get set by your bolt -- ie, instead of calling NimbusClient.kill(...) in you "shut-down bolt", just set this global flag to tell the driver to resume. -Matthias On 05/10/2016 11:29 AM, Navin Ipe wrote: > Turns out, using nimbus.seeds was sufficient. > / > import org.apache.storm.utils.NimbusClient; > import org.apache.storm.utils.Utils; > > Map conf = Utils.readStormConfig(); > conf.put("nimbus.seeds", "localhost"); > > NimbusClient cc = NimbusClient.getConfiguredClient(conf); > > Nimbus.Client client = cc.getClient(); > client.killTopology("MyStorm");/ > > Was able to kill the topology. Am a bit surprised though. I thought > doing this would kill the submitted topology and take me to the next > task line of code in main() (ie: the line of code after I submitted the > topology to Storm). > But killing the topology stops the entire program in this manner: > /:run FAILED > > FAILURE: Build failed with an exception. > > * What went wrong: > Execution failed for task ':run'. >> Process 'command > '/Library/Java/JavaVirtualMachines/jdk1.8.0_73.jdk/Contents/Home/bin/java'' > finished with non-zero exit value 1 > / > I guess this is why a Storm topology is meant to run forever. > Would've been nice though, if Storm provided a clean way to exit a topology. > Thanks for all the help Matthias! > > > > On Tue, May 10, 2016 at 1:57 PM, Matthias J. Sax <mj...@apache.org > <mailto:mj...@apache.org>> wrote: > > My bad. > > The parameter is called "nimbus.seeds" (former "nimbus.host") and not > "nimbus.leader". > > And I guess, "build/libs" is not your working directory. (See you IDE > setting of your run configuration.) > > In doubt, include a "System.out.println(new File().getAbsolutePath());" > (or similar) in your bolt code, to get the working directory. > > And check > > https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L349 > > and > > https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L452 > > You can also specify the location for storm.yaml via > > System.setProperty("storm.conf.file", <your-path>); > (or -Dstorm.conf.file=<your-path>) > > > -Matthias > > On 05/10/2016 06:24 AM, Navin Ipe wrote: > > *@Spico: *The code as promised: > > > > http://nrecursions.blogspot.in/2016/05/more-concepts-of-apache-storm-you-need.html#morecreativetopologystructures > > *@Matthias:* Still no luck. I tried this in the bolt code: > > Map conf = Utils.readStormConfig(); > > conf.put("nimbus.leader", "localhost"); > > > > Also tried altering the storm.yaml file to have this: > > ########### These MUST be filled in for a storm configuration > > > > storm.zookeeper.servers: > > - "localhost" > > # - "server2" > > nimbus.seeds: ["localhost"] > > > > Am running this on LocalCluster, and strangely, the storm.yaml file is > > in my ~/eclipseworkspace/apache-storm-1.0.0_release/conf/ folder, > > although my project is in the ~/eclipseworkspace/MyStorm folder. > > > > Placed a copy of storm.yaml in my project folder and in the build/libs > > folder. Still no luck. > > For this person > > > > <http://stackoverflow.com/questions/36742451/apache-storm-could-not-find-leader-nimbus-from-seed-hosts>, > > it was a port issue. I don't think that's the case for me. > > Is there anything else that could be tried out? > > > > > > > > On Mon, May 9, 2016 at 6:18 PM, Matthias J. Sax <mj...@apache.org > <mailto:mj...@apache.org> > > <mailto:mj...@apache.org <mailto:mj...@apache.org>>> wrote: > > > > Utils.readStormConfig() tries to read "./storm.yaml" from > local disc > > (ie, supervisor machine that executes the bolt) -- as it is using > > "working-directory" a guess it does not find the file, and > thus value > > "nimbus.host" is not set. > > > > Make sure that storm.yaml is found be the worker, or set > nimbus.host > > manually in your bolt code: > > > > conf.put("nimbus.host", "<your-nimbus-host-name>"); > > > > (or "nimbus.leader" that replaces "nimbus.host" in Storm 1.0.0 > > > > > > -Matthias > > > > On 05/09/2016 12:31 PM, Navin Ipe wrote: > > > @Spico: Will share. > > > > > > The streams implementation is working beautifully. > > > Only the topology killing is failing. > > > > > > *Tried:* > > > Map conf = Utils.readStormConfig(); > > > NimbusClient cc = > > > NimbusClient.getConfiguredClient(conf); > > > Nimbus.Client client = cc.getClient(); > > > client.killTopology("myStorm"); > > > > > > *I get these errors:* > > > 29442 [Thread-32-topologyKillerBolt-executor[16 16]] WARN > > > o.a.s.u.NimbusClient - Ignoring exception while trying to > get leader > > > nimbus info from localhost. will retry with a different seed > host. > > > java.lang.RuntimeException: > > > org.apache.storm.thrift.transport.TTransportException: > > > java.net.ConnectException: Connection refused > > > Caused by: > org.apache.storm.thrift.transport.TTransportException: > > > java.net.ConnectException: Connection refused > > > Caused by: java.net.ConnectException: Connection refused > > > 29445 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR > o.a.s.util - > > > Async loop died! > > > java.lang.RuntimeException: > > > org.apache.storm.utils.NimbusLeaderNotFoundException: Could > not find > > > leader nimbus from seed hosts [localhost]. Did you specify a > valid list > > > of nimbus hosts for config nimbus.seeds? > > > Caused by: > org.apache.storm.utils.NimbusLeaderNotFoundException: Could > > > not find leader nimbus from seed hosts [localhost]. Did you > specify a > > > valid list of nimbus hosts for config nimbus.seeds? > > > 29462 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR > o.a.s.util - > > > Halting process: ("Worker died") > > > java.lang.RuntimeException: ("Worker died") > > > > > > The error is apparently on this line: NimbusClient cc = > > > NimbusClient.getConfiguredClient(conf); > > > > > > > > > > > > On Mon, May 9, 2016 at 3:15 PM, Spico Florin > <spicoflo...@gmail.com <mailto:spicoflo...@gmail.com> > <mailto:spicoflo...@gmail.com <mailto:spicoflo...@gmail.com>> > > > <mailto:spicoflo...@gmail.com <mailto:spicoflo...@gmail.com> > <mailto:spicoflo...@gmail.com <mailto:spicoflo...@gmail.com>>>> wrote: > > > > > > Hi! > > > You welcome Navine. I'm also interested in the solution. Can > you > > > please share your remarks and (some code :)) after the > implementation? > > > Thanks. > > > Regards,\ > > > Florin > > > > > > On Mon, May 9, 2016 at 7:20 AM, Navin Ipe > > > <navin....@searchlighthealth.com > <mailto:navin....@searchlighthealth.com> > <mailto:navin....@searchlighthealth.com > <mailto:navin....@searchlighthealth.com>> > > > <mailto:navin....@searchlighthealth.com > <mailto:navin....@searchlighthealth.com> > > <mailto:navin....@searchlighthealth.com > <mailto:navin....@searchlighthealth.com>>>> wrote: > > > > > > @Matthias: That's genius! I didn't know streams and > allGroupings > > > could be used like that. > > > In the way Storm introduced tick tuples, it'd have been > nice if > > > Storm had a native technique of doing all this, but the > ideas > > > you've come up with are extremely good. Am going to try > > > implementing them right away. > > > Thank you too Florin! > > > > > > On Mon, May 9, 2016 at 12:48 AM, Matthias J. Sax > > > <mj...@apache.org <mailto:mj...@apache.org> > <mailto:mj...@apache.org > <mailto:mj...@apache.org>> > > <mailto:mj...@apache.org <mailto:mj...@apache.org> > <mailto:mj...@apache.org <mailto:mj...@apache.org>>>> wrote: > > > > > > To synchronize this, use an additional "shut down > > bolt" that > > > used > > > parallelism of one. "shut down bolt" must be > notified > > by all > > > parallel > > > DbBolts after they performed the flush. If all > > notifications are > > > received, there are not in-flight message and > thus "shut > > > down bolt" can > > > kill the topology safely. > > > > > > -Matthias > > > > > > > > > > > > On 05/08/2016 07:27 PM, Spico Florin wrote: > > > > hi! > > > > there is this solution of sending a poison pill > > message from the > > > > spout. on bolt wil receiv your poison pill and > will > > kill topology via > > > > storm storm nimbus API. one potentential issue > whith > > this approach is > > > > that due to your topology structure regarding the > > parralelism of your > > > > bolts nd the time required by themto excute their > > bussineess logic, is > > > > that the poison pill to be swallowed by the > one bolt > > responsilble for > > > > killing the topology, before all the other > messages > > that are in-flight > > > > to be processed. the conseuence is that you cannot > > be sure that all the > > > > messagess sent by the spout were processed. also > > sharing the total > > > > number of sent messages between the excutors in > > order to shutdown when > > > > all messages were processed coul be error prone > > since tuple can be > > > > processed many times (depending on your guaranteee > > message processing) > > > > or they could be failed. > > > > i coul not find a solution for this. storm is > > intended to run > > > > forunbounded data. > > > > i hope that thrse help, > > > > regard, > > > > florin > > > > > > > > > > > > On Sunday, May 8, 2016, Matthias J. Sax > > <mj...@apache.org <mailto:mj...@apache.org> > <mailto:mj...@apache.org <mailto:mj...@apache.org>> > <mailto:mj...@apache.org <mailto:mj...@apache.org> > > <mailto:mj...@apache.org <mailto:mj...@apache.org>>> > > > > <mailto:mj...@apache.org > <mailto:mj...@apache.org> <mailto:mj...@apache.org > <mailto:mj...@apache.org>> > > <mailto:mj...@apache.org <mailto:mj...@apache.org> > <mailto:mj...@apache.org <mailto:mj...@apache.org>>>>> wrote: > > > > > > > > You can get the number of bolt instances from > > > TopologyContext that is > > > > provided in Bolt.prepare() > > > > > > > > Furthermore, you could put a loop into your > > topology, > > > ie, a bolt reads > > > > it's own output; if you broadcast (ie, > > allGrouping) this > > > > feedback-loop-stream you can let bolt > instances talk > > > to each other. > > > > > > > > builder.setBolt("DbBolt", new MyDBBolt()) > > > > .shuffleGrouping("spout") > > > > .allGrouping("flush-stream", "DbBolt"); > > > > > > > > where "flush-stream" is a second output > stream of > > > MyDBBolt() sending a > > > > notification tuple after it received the > > end-of-stream > > > from spout; > > > > furthermore, if a bolt received the signal via > > > "flush-stream" from > > > > **all** parallel bolt instances, it can > flush to DB. > > > > > > > > Or something like this... Be creative! :) > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 05/08/2016 02:26 PM, Navin Ipe wrote: > > > > > @Matthias: I agree about the batch > processor, > > but my > > > superior took the > > > > > decision to use Storm, and he visualizes > more > > > complexity later for > > > > which > > > > > he needs Storm. > > > > > I had considered the "end of stream" > tuple earlier > > > (my idea was to > > > > emit > > > > > 10 consecutive nulls), but then the question > > was how > > > do I know how > > > > many > > > > > bolt instances have been created, and > how do I > > > notify all the bolts? > > > > > Because it's only after the last bolt > finishes > > > writing to DB, that I > > > > > have to shut down the topology. > > > > > > > > > > @Jason: Thanks. I had seen storm signals > > earlier (I > > > think from one of > > > > > your replies to someone else) and I had > a look at > > > the code too, > > > > but am a > > > > > bit wary because it's no longer being > > maintained and > > > because of the > > > > > issues: > > https://github.com/ptgoetz/storm-signals/issues > > > > > > > > > > On Sun, May 8, 2016 at 5:40 AM, Jason Kusar > > > <ja...@kusar.net <mailto:ja...@kusar.net> > <mailto:ja...@kusar.net <mailto:ja...@kusar.net>> > > <mailto:ja...@kusar.net <mailto:ja...@kusar.net> > <mailto:ja...@kusar.net > <mailto:ja...@kusar.net>>> > > > > <javascript:;> > > > > > <mailto:ja...@kusar.net > <mailto:ja...@kusar.net> <mailto:ja...@kusar.net > <mailto:ja...@kusar.net>> <mailto:ja...@kusar.net > <mailto:ja...@kusar.net> > > <mailto:ja...@kusar.net <mailto:ja...@kusar.net>>> <javascript:;>>> > wrote: > > > > > > > > > > You might want to check out Storm Signals. > > > > > https://github.com/ptgoetz/storm-signals > > > > > > > > > > It might give you what you're looking for. > > > > > > > > > > > > > > > On Sat, May 7, 2016, 11:59 AM Matthias J. > Sax > > > > <mj...@apache.org <mailto:mj...@apache.org> > <mailto:mj...@apache.org > <mailto:mj...@apache.org>> > > <mailto:mj...@apache.org <mailto:mj...@apache.org> > <mailto:mj...@apache.org > <mailto:mj...@apache.org>>> <javascript:;> > > > > > <mailto:mj...@apache.org > <mailto:mj...@apache.org> > > <mailto:mj...@apache.org <mailto:mj...@apache.org>> > > > <mailto:mj...@apache.org > <mailto:mj...@apache.org> <mailto:mj...@apache.org > <mailto:mj...@apache.org>>> > > <javascript:;>>> wrote: > > > > > > > > > > As you mentioned already: Storm is > > designed > > > to run topologies > > > > > forever ;) > > > > > If you have finite data, why do you > > not use > > > a batch > > > > processor??? > > > > > > > > > > As a workaround, you can embed > "control > > > messages" in your > > > > stream > > > > > (or use > > > > > an additional stream for them). > > > > > > > > > > If you want a topology to shut down > > itself, > > > you could use > > > > > > > > > > > > > > > `NimbusClient.getConfiguredClient(conf).getClient().killTopology(name);` > > > > > in your spout/bolt code. > > > > > > > > > > Something like: > > > > > - Spout emit all tuples > > > > > - Spout emit special "end of > stream" > > > control tuple > > > > > - Bolt1 processes everything > > > > > - Bolt1 forward "end of stream" > control > > > tuple (when it > > > > received it) > > > > > - Bolt2 processed everything > > > > > - Bolt2 receives "end of > stream" control > > > tuple => flush to DB > > > > > => kill > > > > > topology > > > > > > > > > > But I guess, this is kinda weird > pattern. > > > > > > > > > > -Matthias > > > > > > > > > > On 05/05/2016 06:13 AM, Navin > Ipe wrote: > > > > > > Hi, > > > > > > > > > > > > I know Storm is designed to run > > forever. I > > > also know about > > > > > Trident's > > > > > > technique of aggregation. But > shouldn't > > > Storm have a way to > > > > > let bolts > > > > > > know that a certain bunch of > processing > > > has been completed? > > > > > > > > > > > > Consider this topology: > > > > > > Spout------>Bolt-A------>Bolt-B > > > > > > | > > |--->Bolt-B > > > > > > | > > \--->Bolt-B > > > > > > > |--->Bolt-A------>Bolt-B > > > > > > | > > |--->Bolt-B > > > > > > | > > \--->Bolt-B > > > > > > > \--->Bolt-A------>Bolt-B > > > > > > > > |--->Bolt-B > > > > > > > > \--->Bolt-B > > > > > > > > > > > > * From Bolt-A to Bolt-B, it is a > > > FieldsGrouping. > > > > > > * Spout emits only a few tuples > > and then > > > stops emitting. > > > > > > * Bolt A takes those tuples and > > > generates millions of > > > > tuples. > > > > > > > > > > > > > > > > > > *Bolt-B accumulates tuples > that Bolt A > > > sends and needs > > > > to know > > > > > when > > > > > > Spout finished emitting. Only > then can > > > Bolt-B start > > > > writing to > > > > > SQL.* > > > > > > > > > > > > *Questions:* > > > > > > 1. How can all Bolts B be notified > > that it > > > is time to > > > > write to > > > > > SQL? > > > > > > 2. After all Bolts B have > written to > > SQL, > > > how to know > > > > that all > > > > > Bolts B > > > > > > have completed writing? > > > > > > 3. How to stop the topology? I > know of > > > > > > > localCluster.killTopology("HelloStorm"), > > > but shouldn't there > > > > > be a way to > > > > > > do it from the Bolt? > > > > > > > > > > > > -- > > > > > > Regards, > > > > > > Navin > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Regards, > > > > > Navin > > > > > > > > > > > > > > > > > > > -- > > > Regards, > > > Navin > > > > > > > > > > > > > > > > > > -- > > > Regards, > > > Navin > > > > > > > > > > -- > > Regards, > > Navin > > > > > -- > Regards, > Navin
signature.asc
Description: OpenPGP digital signature