Can spout.nextTuple method be blocked?

2013-12-27 Thread Link Wang
hi, all
I wonder to know what will happen If I provide a blocked or dead loop 
implementation for nextTuple in my spout, does it just means that a task of 
this spout will monopolize an executor thread? or is there any other fluence to 
my topology?

Re: Can spout.nextTuple method be blocked?

2013-12-29 Thread Link Wang
Thank you! Nathan, Your reply solve my puzzle clearly.


On Sat, Dec 28, 2013 at 11:18 AM, Nathan Marz  wrote:

> You shouldn't do that because the spout will be unable to ack or fail
> tuples while its blocked.
>
>
> On Fri, Dec 27, 2013 at 5:06 PM, Link Wang  wrote:
>
>> hi, all
>> I wonder to know what will happen If I provide a blocked or dead loop
>> implementation for nextTuple in my spout, does it just means that a task of
>> this spout will monopolize an executor thread? or is there any other
>> fluence to my topology?
>
>
>
>
> --
> Twitter: @nathanmarz
> http://nathanmarz.com
>


How to specify worker.childopts for a specified topology?

2014-02-17 Thread Link Wang
Dear all,
I want to specify some worker.childopts for my topology inner it's code,
and I use this way:
conf.put(Config.WORKER_CHILDOPTS, WORKER_OPTS);
but I found it doesn't work.

I don't use storm.yaml file to set worker.childopts, because the memory
requirement of my topologies are widely different.

is there some one encounter the same problem?


Re: How to specify worker.childopts for a specified topology?

2014-02-18 Thread Link Wang
Yes! it works when using Config.TOPOLOGY_WORKER_CHILDOPTS.
Thanks a lot!


On Tue, Feb 18, 2014 at 11:50 PM, Derek Dagit  wrote:

> Try this:
>
> conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, WORKER_OPTS);
>
> Your WORKER_OPTS should be appended to WORKER_CHILDOPTS.
> --
> Derek
>
>
> On 2/18/14, 1:47, Link Wang wrote:
>
>> Dear all,
>> I want to specify some worker.childopts for my topology inner it's code,
>> and I use this way:
>> conf.put(Config.WORKER_CHILDOPTS, WORKER_OPTS);
>> but I found it doesn't work.
>>
>> I don't use storm.yaml file to set worker.childopts, because the memory
>> requirement of my topologies are widely different.
>>
>> is there some one encounter the same problem?
>>
>


how to speed spout.nextTuple method up?

2014-03-09 Thread Link Wang
hi all,

I have a trouble about huge message reading in my spout. that is the 
spout.nextTuple is called so slowly to read message from zmq like this: 
subscriber.recv(0);

it cause too many message accumulated at the zmq subscriber side, then the 
memory usage of this worker will grow up continually.

but when i try to use a pure java application based on jzmq to read the same 
zmq, it worked well. then i add an thread.sleep in my reading loop, the same 
problem appeared.

so, i think the cause is that spout.nextTuple is call too slowly, am i right? 
any suggestion?

Re: Server load - Topology optimization

2014-03-18 Thread Link Wang
Do you filter out message in the nextTuple method and with no tuple sent at one 
time it was called?
There's a internal mechanism that storm will sleep 1ms to call nextTuple if 
this calling of nextTuple sent nothing.

2014年3月19日 上午7:14于 David Crossland 写道:
>
> Perhaps these screenshots might shed some light? I don't think there is much 
> of a latency issue.  I'm really starting to suspect there is some consumption 
> rate issue from the topic.
>
> I set the spout to a high parallelism value as it did seem to improve 
> throughput..
>
> But if there is anything you can spot that would be grand
>
> Thanks
> David
>
> From: Nathan Leung
> Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎21‎:‎14
> To: user@storm.incubator.apache.org
>
> It could be bolt 3.  What is the latency like between your worker and your 
> redis server?  Increasing the number of threads for bolt 3 will likely 
> increase your throughput.  Bolt 1 and 2 are probably CPU bound, but bolt 3 is 
> probably restricted by your network access.  Also I've found that 
> localOrShuffleGrouping can improve performance due to reduced network 
> communications.
>
>
> On Tue, Mar 18, 2014 at 3:55 PM, David Crossland  
> wrote:
>>
>> A bit more information then
>>
>> There are 4 components
>>
>> Spout - This is reading from an azure service bus topic/subscription.  A 
>> connection is created in the open() method of the spout, nextTuple does a 
>> peek on the message, and invokes the following code;
>>
>>     StringWriter writer = new StringWriter();
>>     IOUtils.copy(message.getBody(), writer);
>>     String messageBody = writer.toString();
>>
>> It then deletes the message from the queue.
>>
>> Overall nothing all that exciting..
>>
>> Bolt 1 - Filtering
>>
>> Parses the message body (json string) and converts it to an object 
>> representation.  Filters out anything that isn't a monetise message.  It 
>> then emits the monetise message object to the next bolt.  Monetise messages 
>> account for ~ 0.03% of the total message volume.
>>
>> Bolt 2 - transformation
>>
>> Basically extracts from the monetise object the values that are interesting 
>> and contracts a string which it emits
>>
>> Bolt 3 - Storage
>>
>> Stores the transformed string in Redis using the current date/time as key.
>>
>> -
>>
>> Shuffle grouping is used with the topology
>>
>> I ack every tuple irrespective of whether I emit the tuple or not.  It 
>> should not be attempting to replay tuple.
>>
>> -
>>
>> I don't think Bolt 2/3 are the cause of the bottleneck.  They don't have to 
>> process much data at all tbh.
>>
>> I can accept that perhaps there is something inefficient with the spout, 
>> perhaps it just can't read from the service bus quickly enough. I will do 
>> some more research on this and have a chat with the colleague who wrote this 
>> component.
>>
>> I suppose I'm just trying to identify if I've configured something 
>> incorrectly with respect to storm, whether I'm correct to relate the total 
>> number of executors and tasks to the total number of cores I have available. 
>>  I find it strange that I get a better throughput when I choose an arbitrary 
>> large number for the parallelism hint than if I constrain myself to a 
>> maximum that equates to the number of cores.
>>
>> D
>>
>> From: Nathan Leung
>> Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎18‎:‎38
>> To: user@storm.incubator.apache.org
>>
>> In my experience storm is able to make good use of CPU resources, if the 
>> application is written appropriately.  You shouldn't require too much 
>> executor parallelism if your application is CPU intensive.  If your bolts 
>> are doing things like remote DB/NoSQL accesses, then that changes things and 
>> parallelizing bolts will give you more throughput.  Not knowing your 
>> application, the best way to pin down the problem is to simplify your 
>> topology.  Cut out everything except for the Spout.  How is your filtering 
>> done?  if you return without emitting, the latest versions of storm will 
>> sleep before trying again.  It may be worthwhile to loop in the spout until 
>> you receive a valid message, or the bus is empty.  How much throughput can 
>> you achieve from the spout, emitting a tuple into the ether?  Maybe the 
>> problem is your message bus.  Once you have achieve a level of performance 
>> you are satisfied from the spout, add one bolt.  What bottlenecks does the 
>> bolt introduce?  etc etc.
>>
>>
>> On Tue, Mar 18, 2014 at 2:31 PM, David Crossland  
>> wrote:
>>>
>>> Could my issue relate to memory allocated to the JVM? Most of the setting 
>>> are pretty much the defaults.  Are there any other settings that could be 
>>> throttling the topology?
>>>
>>> I'd like to be able to identify the issue without all this constant 
>>> “stabbing in the dark”… 😃
>>>
>>> D
>>>
>>> From: David Crossland
>>> Sent: ‎Tuesday‎, ‎18‎ ‎March‎ ‎2014 ‎16‎:‎32
>>> To: user@storm.incubator.apache.org
>>>
>>> Being very new to storm I'

How to specify StreamID in storm-kafka spout?

2014-07-14 Thread Link Wang
Dear all,

I'm using KafkaSpout of 0.9.2-incubating, I want my KafkaSpout to emit
more than one stream with given streamID, but It seems that there's not way
to do this. any help?
bellow is my pom dependency of storm-kafka:

org.apache.storm
storm-kafka
0.9.2-incubating
    

Link Wang


回复: How to specify StreamID in storm-kafka spout?

2014-07-14 Thread Link Wang
Thanks for your reply, but i mean that KafkaSpout don't provide a way to set 
stream id, and from it's source code, i found this statement in 
PartitionManager class: 
collector.emit(tup, new KafkaMessageId(partition, offset);
And this is the only emit call.

Vladi Feigin 编写:

>You need to pass the stream ID (you define it ) in SpoutOutputCollector emit 
>method (first parameter)
>
>And  you need also to pass stream id when you build a topology 
>(TopologyBuilder) . For example when you call shuffleGrouping method 
>
>Vladi
>
>
>
>
>On Mon, Jul 14, 2014 at 11:50 AM, Link Wang  wrote:
>
>Dear all, 
>
>
>    I'm using KafkaSpout of 0.9.2-incubating, I want my KafkaSpout to emit 
>more than one stream with given streamID, but It seems that there's not way to 
>do this. any help? 
>
>bellow is my pom dependency of storm-kafka: 
>
>        
>
>            org.apache.storm
>
>            storm-kafka
>
>            0.9.2-incubating
>
>        
>
>
>Link Wang
>
>