We unfortunately cannot pre-determine which tuples will be expensive, so we can 
only use a single queue, so this would not work.

Thanks
Tyson

On Oct 24, 2014, at 4:04 AM, Itai Frenkel 
<[email protected]<mailto:[email protected]>> wrote:

I'm not a Storm expert but I'll give it a try.
What you are describing is a system that has high resource (CPU/memory/network) 
variance depending on the input. Usually shuffle grouping would be enough since 
given large enough number of events and small enough resource consumption per 
event it would even out. Clearly this is not your case, which means that there 
is relatively small number of events or very very high variance between the 
events. Here is a naive solution for this problem, not sure if it fits your 
requirements though:

1. Deploy each topology on a single machine.
2. Each topology(=machine) has a single spout. All spouts read from the same 
centralized queue service.
3. The queue service has two queues. One for "cheap" events, and one for "very 
very expensive" events.
4. The spout (by design in storm) decides if to answer nextTuple with an emit, 
or do nothing. It could use internal monitoring (such as codehale 
metrics/JMX/sigar) to regulate the throughput. It could decide to do nothing or 
to emit a cheap event, or to emit expensive event.

This (somewhat naive) solution does not require involving coordination through 
zookeeper and is easier to code.

Regards,
Itai

________________________________
From: Tyson Norris <[email protected]<mailto:[email protected]>>
Sent: Friday, October 24, 2014 7:09 AM
To: <[email protected]<mailto:[email protected]>>
Subject: CustomStreamGrouping for load-based routing

Hi -
We are concerned with some loads accumulating on certain workers, and having 
that negatively impact the efficiency of the system.

Would a CustomStreamGrouping that is based on the broadcast of load stats be a 
way to deal with this?

For example, in our system, the content stream and processing overhead can be 
adjusted by external systems, creating stream with processing costs that vary 
per tuple.

We are currently using shuffleGrouping, which means that there is a chance that 
the most “expensive” tuples will land on the same task for processing, which 
would be bad (for those tuples) since other tasks might be underutilized by 
comparison.

Fieldgrouping isn’t helpful since it may also route loads disproportionately 
across the cluster.

So, I’m wondering if we can create a CustomStreamGrouping that will:
- periodically post worker stats (heap/cpu usage) + worker task assignments to 
zookeeper
- periodically read worker stats + task assignments of other workers from 
zookeeper
- during chooseTasks() impl, refer to the worker stats to determine which 
worker is least-loaded, and use the taskIds for that worker to route the tuple.


I haven’t tried this yet, but I know there is an issue 
(https://issues.apache.org/jira/browse/STORM-162) created a while back 
suggesting to add load balancing to shuffle grouping, and this seems like a 
simpler alternative.

Thanks
Tyson



Reply via email to