Hi,

I've done a similar thing before with the exception that I was reading from
Cassandra.  The concept is the same though.  Assuming you know that you
have 10,000 records and you want each spout to read 1,000 of them, then you
would launch 10 instances of the spouts.  The first thing they do during
init is to connect to zookeeper and create an ephemeral node (
http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
starting with one called '0'.  If 0 already exists, you'll get an exception
which means you try to create '1' and so on until you successfully create a
node.  That tells you which batch of records that instance of the spout is
responsible for.  I.e., if you successfully created '3', then this spout
needs to set its offset to 3,000.

The reason for using ephemeral nodes is that they are automatically deleted
if the zookeeper client disconnects.  That way if a spout crashes, once
Storm relaunches the spout, it will be able to re-claim that token and
resume work on that batch.  You'll obviously need to have some way to keep
track of which records you've already processed, but that's going to be
specific to your implementation.

Hope that helps!
Jason

On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <navin....@searchlighthealth.com>
wrote:

> Thanks guys.
> I didn't understand "*...spout instances by utilizing Zookeper.*". How
> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>
> As of now I've set
> config.setNumWorkers(2);
> and
> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>
> I'm able to get spoutID in open() using this.spoutId =
> context.getThisTaskId();
> Strangely, my spoutID always begins with 3 instead of 0.
>
> By partitionID I understand that's the fieldGrouping's id.
>
> Even if I do all this, will the spout's tasks actually be distributed
> across multiple workers? Won't I have to create separate spouts?
> builder.setSpout("mongoSpout1", new MongoSpout());
> builder.setSpout("mongoSpout2", new MongoSpout());
> builder.setSpout("mongoSpout3", new MongoSpout());
> and so on?
>
>
>
> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mittspamko...@gmail.com>
> wrote:
>
>> Coreection - group on partition id
>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <navin....@searchlighthealth.com>
>> wrote:
>>
>>> I've seen this:
>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>> but it doesn't explain how workers coordinate with each other, so
>>> requesting a bit of clarity.
>>>
>>> I'm considering a situation where I have 2 million rows in MySQL or
>>> MongoDB.
>>>
>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>> processed output to a Bolt. This happens in Worker1.
>>> 2. I want a different instance of the same Spout class to read the next
>>> 1000 rows in parallel with the working of the Spout of 1, then send the
>>> processed output to an instance of the same Bolt used in 1. This happens in
>>> Worker2.
>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>> 4. I might setup 10 workers like this.
>>> 5. When all the Bolts in the workers are finished, they send their
>>> outputs to a single Bolt in Worker 11.
>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>>>
>>> *My confusion here is in how to make the database iterations happen
>>> batch by batch, parallelly*. Obviously the database connection would
>>> have to be made in some static class outside the workers, but if workers
>>> are started with just "conf.setNumWorkers(2);", then how do I tell the
>>> workers to iterate different rows of the database? Assuming that the
>>> workers are running in different machines.
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>
>
> --
> Regards,
> Navin
>

Reply via email to