@Spico: Will share. The streams implementation is working beautifully. Only the topology killing is failing.
*Tried:* Map conf = Utils.readStormConfig(); NimbusClient cc = NimbusClient.getConfiguredClient(conf); Nimbus.Client client = cc.getClient(); client.killTopology("myStorm"); *I get these errors:* 29442 [Thread-32-topologyKillerBolt-executor[16 16]] WARN o.a.s.u.NimbusClient - Ignoring exception while trying to get leader nimbus info from localhost. will retry with a different seed host. java.lang.RuntimeException: org.apache.storm.thrift.transport.TTransportException: java.net.ConnectException: Connection refused Caused by: org.apache.storm.thrift.transport.TTransportException: java.net.ConnectException: Connection refused Caused by: java.net.ConnectException: Connection refused 29445 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds? Caused by: org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds? 29462 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR o.a.s.util - Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") The error is apparently on this line: NimbusClient cc = NimbusClient.getConfiguredClient(conf); On Mon, May 9, 2016 at 3:15 PM, Spico Florin <spicoflo...@gmail.com> wrote: > Hi! > You welcome Navine. I'm also interested in the solution. Can you please > share your remarks and (some code :)) after the implementation? > Thanks. > Regards,\ > Florin > > On Mon, May 9, 2016 at 7:20 AM, Navin Ipe <navin....@searchlighthealth.com > > wrote: > >> @Matthias: That's genius! I didn't know streams and allGroupings could be >> used like that. >> In the way Storm introduced tick tuples, it'd have been nice if Storm had >> a native technique of doing all this, but the ideas you've come up with are >> extremely good. Am going to try implementing them right away. >> Thank you too Florin! >> >> On Mon, May 9, 2016 at 12:48 AM, Matthias J. Sax <mj...@apache.org> >> wrote: >> >>> To synchronize this, use an additional "shut down bolt" that used >>> parallelism of one. "shut down bolt" must be notified by all parallel >>> DbBolts after they performed the flush. If all notifications are >>> received, there are not in-flight message and thus "shut down bolt" can >>> kill the topology safely. >>> >>> -Matthias >>> >>> >>> >>> On 05/08/2016 07:27 PM, Spico Florin wrote: >>> > hi! >>> > there is this solution of sending a poison pill message from the >>> > spout. on bolt wil receiv your poison pill and will kill topology via >>> > storm storm nimbus API. one potentential issue whith this approach is >>> > that due to your topology structure regarding the parralelism of your >>> > bolts nd the time required by themto excute their bussineess logic, is >>> > that the poison pill to be swallowed by the one bolt responsilble for >>> > killing the topology, before all the other messages that are in-flight >>> > to be processed. the conseuence is that you cannot be sure that all the >>> > messagess sent by the spout were processed. also sharing the total >>> > number of sent messages between the excutors in order to shutdown when >>> > all messages were processed coul be error prone since tuple can be >>> > processed many times (depending on your guaranteee message processing) >>> > or they could be failed. >>> > i coul not find a solution for this. storm is intended to run >>> > forunbounded data. >>> > i hope that thrse help, >>> > regard, >>> > florin >>> > >>> > >>> > On Sunday, May 8, 2016, Matthias J. Sax <mj...@apache.org >>> > <mailto:mj...@apache.org>> wrote: >>> > >>> > You can get the number of bolt instances from TopologyContext that >>> is >>> > provided in Bolt.prepare() >>> > >>> > Furthermore, you could put a loop into your topology, ie, a bolt >>> reads >>> > it's own output; if you broadcast (ie, allGrouping) this >>> > feedback-loop-stream you can let bolt instances talk to each other. >>> > >>> > builder.setBolt("DbBolt", new MyDBBolt()) >>> > .shuffleGrouping("spout") >>> > .allGrouping("flush-stream", "DbBolt"); >>> > >>> > where "flush-stream" is a second output stream of MyDBBolt() >>> sending a >>> > notification tuple after it received the end-of-stream from spout; >>> > furthermore, if a bolt received the signal via "flush-stream" from >>> > **all** parallel bolt instances, it can flush to DB. >>> > >>> > Or something like this... Be creative! :) >>> > >>> > >>> > -Matthias >>> > >>> > >>> > On 05/08/2016 02:26 PM, Navin Ipe wrote: >>> > > @Matthias: I agree about the batch processor, but my superior >>> took the >>> > > decision to use Storm, and he visualizes more complexity later >>> for >>> > which >>> > > he needs Storm. >>> > > I had considered the "end of stream" tuple earlier (my idea was >>> to >>> > emit >>> > > 10 consecutive nulls), but then the question was how do I know >>> how >>> > many >>> > > bolt instances have been created, and how do I notify all the >>> bolts? >>> > > Because it's only after the last bolt finishes writing to DB, >>> that I >>> > > have to shut down the topology. >>> > > >>> > > @Jason: Thanks. I had seen storm signals earlier (I think from >>> one of >>> > > your replies to someone else) and I had a look at the code too, >>> > but am a >>> > > bit wary because it's no longer being maintained and because of >>> the >>> > > issues: https://github.com/ptgoetz/storm-signals/issues >>> > > >>> > > On Sun, May 8, 2016 at 5:40 AM, Jason Kusar <ja...@kusar.net >>> > <javascript:;> >>> > > <mailto:ja...@kusar.net <javascript:;>>> wrote: >>> > > >>> > > You might want to check out Storm Signals. >>> > > https://github.com/ptgoetz/storm-signals >>> > > >>> > > It might give you what you're looking for. >>> > > >>> > > >>> > > On Sat, May 7, 2016, 11:59 AM Matthias J. Sax >>> > <mj...@apache.org <javascript:;> >>> > > <mailto:mj...@apache.org <javascript:;>>> wrote: >>> > > >>> > > As you mentioned already: Storm is designed to run >>> topologies >>> > > forever ;) >>> > > If you have finite data, why do you not use a batch >>> > processor??? >>> > > >>> > > As a workaround, you can embed "control messages" in your >>> > stream >>> > > (or use >>> > > an additional stream for them). >>> > > >>> > > If you want a topology to shut down itself, you could use >>> > > >>> > >>> `NimbusClient.getConfiguredClient(conf).getClient().killTopology(name);` >>> > > in your spout/bolt code. >>> > > >>> > > Something like: >>> > > - Spout emit all tuples >>> > > - Spout emit special "end of stream" control tuple >>> > > - Bolt1 processes everything >>> > > - Bolt1 forward "end of stream" control tuple (when it >>> > received it) >>> > > - Bolt2 processed everything >>> > > - Bolt2 receives "end of stream" control tuple => flush >>> to DB >>> > > => kill >>> > > topology >>> > > >>> > > But I guess, this is kinda weird pattern. >>> > > >>> > > -Matthias >>> > > >>> > > On 05/05/2016 06:13 AM, Navin Ipe wrote: >>> > > > Hi, >>> > > > >>> > > > I know Storm is designed to run forever. I also know >>> about >>> > > Trident's >>> > > > technique of aggregation. But shouldn't Storm have a >>> way to >>> > > let bolts >>> > > > know that a certain bunch of processing has been >>> completed? >>> > > > >>> > > > Consider this topology: >>> > > > Spout------>Bolt-A------>Bolt-B >>> > > > | |--->Bolt-B >>> > > > | \--->Bolt-B >>> > > > |--->Bolt-A------>Bolt-B >>> > > > | |--->Bolt-B >>> > > > | \--->Bolt-B >>> > > > \--->Bolt-A------>Bolt-B >>> > > > |--->Bolt-B >>> > > > \--->Bolt-B >>> > > > >>> > > > * From Bolt-A to Bolt-B, it is a FieldsGrouping. >>> > > > * Spout emits only a few tuples and then stops >>> emitting. >>> > > > * Bolt A takes those tuples and generates millions of >>> > tuples. >>> > > > >>> > > > >>> > > > *Bolt-B accumulates tuples that Bolt A sends and needs >>> > to know >>> > > when >>> > > > Spout finished emitting. Only then can Bolt-B start >>> > writing to >>> > > SQL.* >>> > > > >>> > > > *Questions:* >>> > > > 1. How can all Bolts B be notified that it is time to >>> > write to >>> > > SQL? >>> > > > 2. After all Bolts B have written to SQL, how to know >>> > that all >>> > > Bolts B >>> > > > have completed writing? >>> > > > 3. How to stop the topology? I know of >>> > > > localCluster.killTopology("HelloStorm"), but shouldn't >>> there >>> > > be a way to >>> > > > do it from the Bolt? >>> > > > >>> > > > -- >>> > > > Regards, >>> > > > Navin >>> > > >>> > > >>> > > >>> > > >>> > > -- >>> > > Regards, >>> > > Navin >>> > >>> >>> >> >> >> -- >> Regards, >> Navin >> > > -- Regards, Navin