Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2113#discussion_r120407010
--- Diff:
examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
---
@@ -34,65 +39,119 @@
import org.apache.storm.tuple.Values;
public class ResourceAwareExampleTopology {
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
-
- @Override
- public void prepare(Map<String, Object> conf, TopologyContext context,
OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
+ public static class ExclamationBolt extends BaseRichBolt {
+ //Have a crummy cache to show off shared memory accounting
+ private static final ConcurrentHashMap<String, String>
myCrummyCache =
+ new ConcurrentHashMap<>();
+ private static final int CACHE_SIZE = 100_000;
+ OutputCollector _collector;
+
+ protected static String getFromCache(String key) {
+ return myCrummyCache.get(key);
+ }
+
+ protected static void addToCache(String key, String value) {
+ myCrummyCache.putIfAbsent(key, value);
+ int numToRemove = myCrummyCache.size() - CACHE_SIZE;
+ if (numToRemove > 0) {
+ //Remove something randomly...
+ Iterator<Entry<String, String>> it =
myCrummyCache.entrySet().iterator();
+ for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
+ it.next();
+ it.remove();
+ }
+ }
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String orig = tuple.getString(0);
+ String ret = getFromCache(orig);
+ if (ret == null) {
+ ret = orig + "!!!";
+ addToCache(orig, ret);
+ }
+ _collector.emit(tuple, new Values(ret));
+ _collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
}
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ //A topology can set resources in terms of CPU and Memory for each
component
+ // These can be chained (like with setting the CPU requirement)
+ SpoutDeclarer spout = builder.setSpout("word", new
TestWordSpout(), 10).setCPULoad(20);
+ // Or done separately like with setting the
+ // onheap and offheap memory requirement
+ spout.setMemoryLoad(64, 16);
+ //On heap memory is used to help calculate the heap of the java
process for the worker
+ // off heap memory is for things like JNI memory allocated off
heap, or when using the
+ // ShellBolt or ShellSpout. In this case the 16 MB of off heap is
just as an example
+ // as we are not using it.
+
+ // Some times a Bolt or Spout will have some memory that is shared
between the instances
+ // These are typically caches, but could be anything like a static
database that is memory
+ // mapped into the processes. These can be declared separately and
added to the bolts and
+ // spouts that use them. Or if only one uses it they can be
created inline with the add
+ SharedOnHeap exclaimCache = new SharedOnHeap(100, "exclaim-cache");
+ SharedOffHeapWithinNode notImplementedButJustAnExample =
+ new SharedOffHeapWithinNode(500,
"not-implemented-node-level-cache");
+
+ //If CPU or memory is not set the values stored in
topology.component.resources.onheap.memory.mb,
+ // topology.component.resources.offheap.memory.mb and
topology.component.cpu.pcore.percent
+ // will be used instead
+ builder
+ .setBolt("exclaim1", new ExclamationBolt(), 3)
+ .shuffleGrouping("word")
+ .addSharedMemory(exclaimCache);
+
+ builder
+ .setBolt("exclaim2", new ExclamationBolt(), 2)
+ .shuffleGrouping("exclaim1")
+ .setMemoryLoad(100)
+ .addSharedMemory(exclaimCache)
+ .addSharedMemory(notImplementedButJustAnExample);
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ //in RAS the number of workers will be computed for you so you
don't need to set
+ //conf.setNumWorkers(3);
+
+ // The size of a worker is limited by the amount of heap assigned
to it and can be overridden by
+ conf.setTopologyWorkerMaxHeapSize(1024.0);
+ // This is to try and balance the time needed to devote to GC
against not needing to
+ // serialize/deserialize tuples
+
+ //The priority of a topology describes the importance of the
topology in decreasing importance
+ // starting from 0 (i.e. 0 is the highest priority and the
priority importance decreases as the priority number increases).
+ //Recommended range of 0-29 but no hard limit set.
+ // If there are not enough resources in a cluster the priority in
combination with how far over a guarantees
+ // a user is will decide which topologies are run and which ones
are not.
+ conf.setTopologyPriority(29);
+
+ //set to use the default resource aware strategy when using the
MultitenantResourceAwareBridgeScheduler
+ conf.setTopologyStrategy(
+
"org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy");
+
+ String topoName = "test";
+ if (args != null && args.length > 0) {
+ topoName = args[0];
+ }
+ //Not needed on RAS
--- End diff --
This was already mentioned
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---