Hi, I've done a similar thing before with the exception that I was reading from Cassandra. The concept is the same though. Assuming you know that you have 10,000 records and you want each spout to read 1,000 of them, then you would launch 10 instances of the spouts. The first thing they do during init is to connect to zookeeper and create an ephemeral node ( http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes) starting with one called '0'. If 0 already exists, you'll get an exception which means you try to create '1' and so on until you successfully create a node. That tells you which batch of records that instance of the spout is responsible for. I.e., if you successfully created '3', then this spout needs to set its offset to 3,000.
The reason for using ephemeral nodes is that they are automatically deleted if the zookeeper client disconnects. That way if a spout crashes, once Storm relaunches the spout, it will be able to re-claim that token and resume work on that batch. You'll obviously need to have some way to keep track of which records you've already processed, but that's going to be specific to your implementation. Hope that helps! Jason On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <navin....@searchlighthealth.com> wrote: > Thanks guys. > I didn't understand "*...spout instances by utilizing Zookeper.*". How > does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout? > > As of now I've set > config.setNumWorkers(2); > and > builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2); > > I'm able to get spoutID in open() using this.spoutId = > context.getThisTaskId(); > Strangely, my spoutID always begins with 3 instead of 0. > > By partitionID I understand that's the fieldGrouping's id. > > Even if I do all this, will the spout's tasks actually be distributed > across multiple workers? Won't I have to create separate spouts? > builder.setSpout("mongoSpout1", new MongoSpout()); > builder.setSpout("mongoSpout2", new MongoSpout()); > builder.setSpout("mongoSpout3", new MongoSpout()); > and so on? > > > > On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mittspamko...@gmail.com> > wrote: > >> Coreection - group on partition id >> On Apr 19, 2016 6:33 AM, "Navin Ipe" <navin....@searchlighthealth.com> >> wrote: >> >>> I've seen this: >>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html >>> but it doesn't explain how workers coordinate with each other, so >>> requesting a bit of clarity. >>> >>> I'm considering a situation where I have 2 million rows in MySQL or >>> MongoDB. >>> >>> 1. I want to use a Spout to read the first 1000 rows and send the >>> processed output to a Bolt. This happens in Worker1. >>> 2. I want a different instance of the same Spout class to read the next >>> 1000 rows in parallel with the working of the Spout of 1, then send the >>> processed output to an instance of the same Bolt used in 1. This happens in >>> Worker2. >>> 3. Same as 1 and 2, but it happens in Worker 3. >>> 4. I might setup 10 workers like this. >>> 5. When all the Bolts in the workers are finished, they send their >>> outputs to a single Bolt in Worker 11. >>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table. >>> >>> *My confusion here is in how to make the database iterations happen >>> batch by batch, parallelly*. Obviously the database connection would >>> have to be made in some static class outside the workers, but if workers >>> are started with just "conf.setNumWorkers(2);", then how do I tell the >>> workers to iterate different rows of the database? Assuming that the >>> workers are running in different machines. >>> >>> -- >>> Regards, >>> Navin >>> >> > > > -- > Regards, > Navin >