Navin,

I think this two lines are not cleared so I may have misunderstand.

5. When all the Bolts in the workers are finished, they send their outputs
to a single Bolt in Worker 11.
6. The Bolt in Worker 11 writes the processed value to a new MySQL table.

If you don't need to aggregate (I mean join) the results from Bolt in
Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing.

2016년 4월 20일 (수) 오후 3:28, Navin Ipe <navin....@searchlighthealth.com>님이 작성:

> @Jungtaek: This person (
> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
> claims that Storm would automatically manage the flow of data between
> spouts and blots on different workers. Can anyone confirm this? If this is
> the case, I won't have to bother using Trident.
>
> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <
> navin....@searchlighthealth.com> wrote:
>
>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
>> searching for the wrong thing)
>>
>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
>> Trident, but didn't go ahead because it was not clear how I could implement
>> multiple spouts in Trident, where each spout would iterate a certain number
>> of rows of a database. Any idea how that could happen.
>>
>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <kabh...@gmail.com> wrote:
>>
>>> There's other idea without relying on Zookeeper : use ordinal of task id
>>> between same components (spout)
>>>
>>> Task id is issued across all tasks including system tasks so you can't
>>> assume spout tasks are having task id sequentially, but whatever you can do
>>> the trick - check "ordinal" of this spout task's id around same spouts.
>>> Please refer GeneralTopologyContext.getComponentTasks(String
>>> componentId).
>>>
>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>>> easy to aggregate the results of Bolt2 from Bolt3.
>>> You should consider windowing by processed time or Trident or maintain
>>> your own buffers.
>>>
>>> Hope this helps.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:
>>>
>>>> Hi,
>>>>
>>>> I've done a similar thing before with the exception that I was reading
>>>> from Cassandra.  The concept is the same though.  Assuming you know that
>>>> you have 10,000 records and you want each spout to read 1,000 of them, then
>>>> you would launch 10 instances of the spouts.  The first thing they do
>>>> during init is to connect to zookeeper and create an ephemeral node (
>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>>> which means you try to create '1' and so on until you successfully create a
>>>> node.  That tells you which batch of records that instance of the spout is
>>>> responsible for.  I.e., if you successfully created '3', then this spout
>>>> needs to set its offset to 3,000.
>>>>
>>>> The reason for using ephemeral nodes is that they are automatically
>>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>>> once Storm relaunches the spout, it will be able to re-claim that token and
>>>> resume work on that batch.  You'll obviously need to have some way to keep
>>>> track of which records you've already processed, but that's going to be
>>>> specific to your implementation.
>>>>
>>>> Hope that helps!
>>>> Jason
>>>>
>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>>> navin....@searchlighthealth.com> wrote:
>>>>
>>>>> Thanks guys.
>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*".
>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a
>>>>> Spout?
>>>>>
>>>>> As of now I've set
>>>>> config.setNumWorkers(2);
>>>>> and
>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>>
>>>>> I'm able to get spoutID in open() using this.spoutId =
>>>>> context.getThisTaskId();
>>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>>
>>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>>
>>>>> Even if I do all this, will the spout's tasks actually be distributed
>>>>> across multiple workers? Won't I have to create separate spouts?
>>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>>> and so on?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mittspamko...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Coreection - group on partition id
>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <navin....@searchlighthealth.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've seen this:
>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>>> requesting a bit of clarity.
>>>>>>>
>>>>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>>>>> MongoDB.
>>>>>>>
>>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>>> happens in Worker2.
>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>>> 4. I might setup 10 workers like this.
>>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>>> outputs to a single Bolt in Worker 11.
>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>>> table.
>>>>>>>
>>>>>>> *My confusion here is in how to make the database iterations happen
>>>>>>> batch by batch, parallelly*. Obviously the database connection
>>>>>>> would have to be made in some static class outside the workers, but if
>>>>>>> workers are started with just "conf.setNumWorkers(2);", then how do
>>>>>>> I tell the workers to iterate different rows of the database? Assuming 
>>>>>>> that
>>>>>>> the workers are running in different machines.
>>>>>>>
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Navin
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Navin
>>>>>
>>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>
>
>
> --
> Regards,
> Navin
>

Reply via email to