Navin, I think this two lines are not cleared so I may have misunderstand.
5. When all the Bolts in the workers are finished, they send their outputs to a single Bolt in Worker 11. 6. The Bolt in Worker 11 writes the processed value to a new MySQL table. If you don't need to aggregate (I mean join) the results from Bolt in Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing. 2016년 4월 20일 (수) 오후 3:28, Navin Ipe <navin....@searchlighthealth.com>님이 작성: > @Jungtaek: This person ( > http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology) > claims that Storm would automatically manage the flow of data between > spouts and blots on different workers. Can anyone confirm this? If this is > the case, I won't have to bother using Trident. > > On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe < > navin....@searchlighthealth.com> wrote: > >> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral >> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was >> searching for the wrong thing) >> >> @Jungtaek: Will explore component tasks. Meanwhile, I had considered >> Trident, but didn't go ahead because it was not clear how I could implement >> multiple spouts in Trident, where each spout would iterate a certain number >> of rows of a database. Any idea how that could happen. >> >> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <kabh...@gmail.com> wrote: >> >>> There's other idea without relying on Zookeeper : use ordinal of task id >>> between same components (spout) >>> >>> Task id is issued across all tasks including system tasks so you can't >>> assume spout tasks are having task id sequentially, but whatever you can do >>> the trick - check "ordinal" of this spout task's id around same spouts. >>> Please refer GeneralTopologyContext.getComponentTasks(String >>> componentId). >>> >>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be >>> easy to aggregate the results of Bolt2 from Bolt3. >>> You should consider windowing by processed time or Trident or maintain >>> your own buffers. >>> >>> Hope this helps. >>> >>> Thanks, >>> Jungtaek Lim (HeartSaVioR) >>> >>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성: >>> >>>> Hi, >>>> >>>> I've done a similar thing before with the exception that I was reading >>>> from Cassandra. The concept is the same though. Assuming you know that >>>> you have 10,000 records and you want each spout to read 1,000 of them, then >>>> you would launch 10 instances of the spouts. The first thing they do >>>> during init is to connect to zookeeper and create an ephemeral node ( >>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes) >>>> starting with one called '0'. If 0 already exists, you'll get an exception >>>> which means you try to create '1' and so on until you successfully create a >>>> node. That tells you which batch of records that instance of the spout is >>>> responsible for. I.e., if you successfully created '3', then this spout >>>> needs to set its offset to 3,000. >>>> >>>> The reason for using ephemeral nodes is that they are automatically >>>> deleted if the zookeeper client disconnects. That way if a spout crashes, >>>> once Storm relaunches the spout, it will be able to re-claim that token and >>>> resume work on that batch. You'll obviously need to have some way to keep >>>> track of which records you've already processed, but that's going to be >>>> specific to your implementation. >>>> >>>> Hope that helps! >>>> Jason >>>> >>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe < >>>> navin....@searchlighthealth.com> wrote: >>>> >>>>> Thanks guys. >>>>> I didn't understand "*...spout instances by utilizing Zookeper.*". >>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a >>>>> Spout? >>>>> >>>>> As of now I've set >>>>> config.setNumWorkers(2); >>>>> and >>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2); >>>>> >>>>> I'm able to get spoutID in open() using this.spoutId = >>>>> context.getThisTaskId(); >>>>> Strangely, my spoutID always begins with 3 instead of 0. >>>>> >>>>> By partitionID I understand that's the fieldGrouping's id. >>>>> >>>>> Even if I do all this, will the spout's tasks actually be distributed >>>>> across multiple workers? Won't I have to create separate spouts? >>>>> builder.setSpout("mongoSpout1", new MongoSpout()); >>>>> builder.setSpout("mongoSpout2", new MongoSpout()); >>>>> builder.setSpout("mongoSpout3", new MongoSpout()); >>>>> and so on? >>>>> >>>>> >>>>> >>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mittspamko...@gmail.com >>>>> > wrote: >>>>> >>>>>> Coreection - group on partition id >>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <navin....@searchlighthealth.com> >>>>>> wrote: >>>>>> >>>>>>> I've seen this: >>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html >>>>>>> but it doesn't explain how workers coordinate with each other, so >>>>>>> requesting a bit of clarity. >>>>>>> >>>>>>> I'm considering a situation where I have 2 million rows in MySQL or >>>>>>> MongoDB. >>>>>>> >>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the >>>>>>> processed output to a Bolt. This happens in Worker1. >>>>>>> 2. I want a different instance of the same Spout class to read the >>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send >>>>>>> the processed output to an instance of the same Bolt used in 1. This >>>>>>> happens in Worker2. >>>>>>> 3. Same as 1 and 2, but it happens in Worker 3. >>>>>>> 4. I might setup 10 workers like this. >>>>>>> 5. When all the Bolts in the workers are finished, they send their >>>>>>> outputs to a single Bolt in Worker 11. >>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL >>>>>>> table. >>>>>>> >>>>>>> *My confusion here is in how to make the database iterations happen >>>>>>> batch by batch, parallelly*. Obviously the database connection >>>>>>> would have to be made in some static class outside the workers, but if >>>>>>> workers are started with just "conf.setNumWorkers(2);", then how do >>>>>>> I tell the workers to iterate different rows of the database? Assuming >>>>>>> that >>>>>>> the workers are running in different machines. >>>>>>> >>>>>>> -- >>>>>>> Regards, >>>>>>> Navin >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Regards, >>>>> Navin >>>>> >>>> >> >> >> -- >> Regards, >> Navin >> > > > > -- > Regards, > Navin >