[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120688606
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
@@ -24,8 +24,10 @@
 import org.apache.storm.topology.TopologyBuilder;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
--- End diff --

No, don't worry about it. Hadn't noticed this was in a test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120688356
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
@@ -24,8 +24,10 @@
 import org.apache.storm.topology.TopologyBuilder;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
--- End diff --

Checkstyle does not check tests for some reason, but if you care about it I 
can change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120687251
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
@@ -21,41 +21,52 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.WorkerResources;
+
 public interface SchedulerAssignment {
 /**
  * Does this slot occupied by this assignment?
  * @param slot
- * @return
+ * @return true if the slot is occupied else false
  */
 public boolean isSlotOccupied(WorkerSlot slot);
 
 /**
- * is the executor assigned?
+ * Is the executor assigned?
  * 
  * @param executor
- * @return
+ * @return true if it is assigned else false
  */
 public boolean isExecutorAssigned(ExecutorDetails executor);
 
 /**
- * get the topology-id this assignment is for.
- * @return
+ * Return the ID of the topolgoy.
--- End diff --

I even typo my fixes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120676761
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
 ---
@@ -29,8 +34,12 @@
  */
 public class DefaultResourceDeclarer 
implements ResourceDeclarer, ITridentResource {
 
-private Map resources = new HashMap<>();
-private static Map conf = Utils.readStormConfig();
+//@{link org.apache.storm.trident.planner.Node} and several other 
tirdent classes inherit from DefaultResourceDeclarer
--- End diff --

tirdent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120677469
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
@@ -44,6 +46,11 @@ public void testMemoryLoadLargerThanMaxHeapSize() throws 
Exception {
 config1.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
 config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
 config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
129.0);
-Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, 
stormTopology1);
+try {
+Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, 
stormTopology1);
+fail("Expected exeption not thrown");
--- End diff --

exeption


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120676803
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
 ---
@@ -29,8 +34,12 @@
  */
 public class DefaultResourceDeclarer 
implements ResourceDeclarer, ITridentResource {
 
-private Map resources = new HashMap<>();
-private static Map conf = Utils.readStormConfig();
+//@{link org.apache.storm.trident.planner.Node} and several other 
tirdent classes inherit from DefaultResourceDeclarer
+// These classes are serialized out as part of the bolts and spouts of 
a topology, often for each bolt/spout in the topology.
+// The following are marked as transiant because they are never used 
after the topology is created so keeping them around just wasts
--- End diff --

wasts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120675985
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
@@ -21,41 +21,52 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.WorkerResources;
+
 public interface SchedulerAssignment {
 /**
  * Does this slot occupied by this assignment?
  * @param slot
- * @return
+ * @return true if the slot is occupied else false
  */
 public boolean isSlotOccupied(WorkerSlot slot);
 
 /**
- * is the executor assigned?
+ * Is the executor assigned?
  * 
  * @param executor
- * @return
+ * @return true if it is assigned else false
  */
 public boolean isExecutorAssigned(ExecutorDetails executor);
 
 /**
- * get the topology-id this assignment is for.
- * @return
+ * Return the ID of the topolgoy.
--- End diff --

topolgoy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120677418
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
@@ -24,8 +24,10 @@
 import org.apache.storm.topology.TopologyBuilder;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
--- End diff --

I think checkstyle bans this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120671482
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -144,8 +148,19 @@ public Void call() throws Exception {
 File tr = new File(tmproot);
 try {
 downloadBaseBlobs(tr);
+if (_assignment.is_set_total_node_shared()) {
+File sharedMemoryDirTmpLocation = new File(tr, 
"shared_by_topology");
+//We need to create a directory for shared memory 
to write to (we should not encourage this though)
+Path path = sharedMemoryDirTmpLocation.toPath();
+Files.createDirectories(path);
+}
 _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+Map topoConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+_fsOps.setupStormCodeDir(topoConf, _stormRoot);
+if (_assignment.is_has_node_shared_memory()) {
+File sharedMemoryDir = new File(_stormRoot, 
"shared_by_topology");
--- End diff --

I can add it to the documentation.  But I didn't spend a lot of time 
documenting it on purpose.  shared memory off heap is hard to get right and I 
would rather discourage people from using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120669238
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -144,8 +148,19 @@ public Void call() throws Exception {
 File tr = new File(tmproot);
 try {
 downloadBaseBlobs(tr);
+if (_assignment.is_set_total_node_shared()) {
+File sharedMemoryDirTmpLocation = new File(tr, 
"shared_by_topology");
+//We need to create a directory for shared memory 
to write to (we should not encourage this though)
+Path path = sharedMemoryDirTmpLocation.toPath();
+Files.createDirectories(path);
+}
 _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+Map topoConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+_fsOps.setupStormCodeDir(topoConf, _stormRoot);
+if (_assignment.is_has_node_shared_memory()) {
+File sharedMemoryDir = new File(_stormRoot, 
"shared_by_topology");
--- End diff --

Okay. I was just thinking it might be helpful if the topology could get the 
directory path (or _stormRoot), so users don't have to figure out how to path 
to this directory themselves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120668301
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java 
---
@@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String 
workerId, Map resou
 }
 }
 
-if (totalMem != null) {
-MemoryCore memCore = (MemoryCore) 
workerGroup.getCores().get(SubSystemType.memory);
-try {
-
memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
-} catch (IOException e) {
-throw new RuntimeException("Cannot set 
memory.limit_in_bytes! Exception: ", e);
+if ((boolean) 
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
+if (totalMem != null) {
+int cgroupMem =
+(int)
+(Math.ceil(
+ObjectReader.getDouble(
+
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
--- End diff --

Makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120668119
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java 
---
@@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
 return addConfiguration(Config.TOPOLOGY_TASKS, val);
 }
 
+@SuppressWarnings("unchecked")
 @Override
 public T setMemoryLoad(Number onHeap) {
 if (onHeap != null) {
--- End diff --

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120668059
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -19,36 +19,82 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-//TODO: improve this by maintaining slot -> executors as well for more 
efficient operations
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class SchedulerAssignmentImpl implements SchedulerAssignment {
+private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
+
 /**
  * topology-id this assignment is for.
  */
-String topologyId;
+private final String topologyId;
+
 /**
  * assignment detail, a mapping from executor to 
WorkerSlot
  */
-Map executorToSlot;
-
-public SchedulerAssignmentImpl(String topologyId, Map executorToSlots) {
-this.topologyId = topologyId;
-this.executorToSlot = new HashMap<>(0);
-if (executorToSlots != null) {
-this.executorToSlot.putAll(executorToSlots);
+private final Map executorToSlot = new 
HashMap<>();
+private final Map resources = new 
HashMap<>();
+private final Map totalSharedOffHeap = new HashMap<>();
+
+public SchedulerAssignmentImpl(String topologyId, Map executorToSlot,
+Map resources, Map totalSharedOffHeap) {
+this.topologyId = topologyId;   
+if (executorToSlot != null) {
--- End diff --

Ok. I'm the opposite way, I prefer avoiding null checks. But this is fine :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120658098
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -144,8 +148,19 @@ public Void call() throws Exception {
 File tr = new File(tmproot);
 try {
 downloadBaseBlobs(tr);
+if (_assignment.is_set_total_node_shared()) {
+File sharedMemoryDirTmpLocation = new File(tr, 
"shared_by_topology");
+//We need to create a directory for shared memory 
to write to (we should not encourage this though)
+Path path = sharedMemoryDirTmpLocation.toPath();
+Files.createDirectories(path);
+}
 _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+Map topoConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+_fsOps.setupStormCodeDir(topoConf, _stormRoot);
+if (_assignment.is_has_node_shared_memory()) {
+File sharedMemoryDir = new File(_stormRoot, 
"shared_by_topology");
--- End diff --

That is up to the topology.  It could be JNI code or even something that a 
ShellBolt or ShellSpout does.  The point is to provide a place for them to 
store the data if they need it.  We have a group doing this in /tmp right now 
and causing all kinds of issues if things crash/get rescheduled leaking things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120656010
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -528,28 +577,108 @@ public void cleanUpForRestart() throws IOException {
 deleteSavedWorkerUser();
 _workerId = null;
 }
-
+
+/**
+ * Check if the container is over its memory limit AND needs to be 
killed. This does not necessarily mean
+ * that it just went over the limit.
+ * @throws IOException on any error
+ */
+public boolean isMemoryLimitViolated(LocalAssignment 
withUpdatedLimits) throws IOException {
--- End diff --

Yes but it is overridden by other Container Implementations and they may 
want to throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120654636
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java 
---
@@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String 
workerId, Map resou
 }
 }
 
-if (totalMem != null) {
-MemoryCore memCore = (MemoryCore) 
workerGroup.getCores().get(SubSystemType.memory);
-try {
-
memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
-} catch (IOException e) {
-throw new RuntimeException("Cannot set 
memory.limit_in_bytes! Exception: ", e);
+if ((boolean) 
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
+if (totalMem != null) {
+int cgroupMem =
+(int)
+(Math.ceil(
+ObjectReader.getDouble(
+
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
--- End diff --

Except in some unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120648513
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
 ---
@@ -29,8 +33,8 @@
  */
 public class DefaultResourceDeclarer 
implements ResourceDeclarer, ITridentResource {
 
-private Map resources = new HashMap<>();
-private static Map conf = Utils.readStormConfig();
+private final transient Map resources = new 
HashMap<>();
--- End diff --

In trident Node inherits from DefaultResourceDeclarer, and Node is 
serialized out (repeatedly for different bolts and spouts in the topology).  
These are never used after the topology is created so this reduces the 
serialized size of a trident topology.

I'll add a comment in the code to make it clean why.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120647251
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java 
---
@@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
 return addConfiguration(Config.TOPOLOGY_TASKS, val);
 }
 
+@SuppressWarnings("unchecked")
 @Override
 public T setMemoryLoad(Number onHeap) {
 if (onHeap != null) {
--- End diff --

I filed https://issues.apache.org/jira/browse/STORM-2545 for it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120646636
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java 
---
@@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
 return addConfiguration(Config.TOPOLOGY_TASKS, val);
 }
 
+@SuppressWarnings("unchecked")
 @Override
 public T setMemoryLoad(Number onHeap) {
 if (onHeap != null) {
--- End diff --

I agree but that is code that came before, and if we want to make that 
change I would prefer to do it on a separate pull request, just so a breaking 
change does not sneak in as part of review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120645730
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -141,4 +222,42 @@ public String getTopologyId() {
 }
 return ret;
 }
-}
+
+@Override
+public Map getScheduledResources() {
+return resources;
+}
+
+public void setTotalSharedOffHeapMemory(String node, double value) {
+totalSharedOffHeap.put(node, value);
+}
+
+@Override
+public Map getTotalSharedOffHeapMemory() {
+return totalSharedOffHeap;
+}
+
+/**
+ * Update the resources for this assignment (This should go aware when 
the RAS-MT bridge goes away
--- End diff --

Sorry code I forgot to remove that is not needed here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-07 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120644839
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -19,36 +19,82 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-//TODO: improve this by maintaining slot -> executors as well for more 
efficient operations
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class SchedulerAssignmentImpl implements SchedulerAssignment {
+private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
+
 /**
  * topology-id this assignment is for.
  */
-String topologyId;
+private final String topologyId;
+
 /**
  * assignment detail, a mapping from executor to 
WorkerSlot
  */
-Map executorToSlot;
-
-public SchedulerAssignmentImpl(String topologyId, Map executorToSlots) {
-this.topologyId = topologyId;
-this.executorToSlot = new HashMap<>(0);
-if (executorToSlots != null) {
-this.executorToSlot.putAll(executorToSlots);
+private final Map executorToSlot = new 
HashMap<>();
+private final Map resources = new 
HashMap<>();
+private final Map totalSharedOffHeap = new HashMap<>();
+
+public SchedulerAssignmentImpl(String topologyId, Map executorToSlot,
+Map resources, Map totalSharedOffHeap) {
+this.topologyId = topologyId;   
+if (executorToSlot != null) {
--- End diff --

I prefer to be very lenient with inputs if something can be null I don't 
want to force others to never use null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120437166
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -20,23 +20,12 @@
 public class WorkerSlot {
 private String nodeId;
--- End diff --

Nit: Can these be final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120454284
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -559,9 +688,17 @@ public void cleanUpForRestart() throws IOException {
 public abstract boolean runProfiling(ProfileRequest request, boolean 
stop) throws IOException, InterruptedException;
 
 /**
- * @return the id of the container or null if there is no worker id 
right now.
+ * Get the id of the container or null if there is no worker id right 
now.
  */
 public String getWorkerId() {
 return _workerId;
 }
+
+/**
+ * Get the topology id this is a part of.
+ */
+public String getTopoogyId() {
--- End diff --

topoogy -> topology


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120436933
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -20,23 +20,12 @@
 public class WorkerSlot {
 private String nodeId;
 private int port;
-// amount of on-heap memory allocated to it
-private double memOnHeap = 0.0;
-// amount of off-heap memory allocated to it
-private double memOffHeap = 0.0;
-// amount of cpu allocated to it
-private double cpu = 0.0;
-
-public WorkerSlot(String nodeId, Number port) {
-this(nodeId, port, 0.0, 0.0, 0.0);
-}
 
-public WorkerSlot(String nodeId, Number port, double memOnHeap, double 
memOffHeap, double cpu) {
+public WorkerSlot(String nodeId, Number port) {
+if (port == null) throw new NullPointerException("port cannot be 
null");
--- End diff --

I think checkstyle will complain about this due to missing braces. Maybe 
you can use 
http://google.github.io/guava/releases/snapshot/api/docs/com/google/common/base/Preconditions.html#checkNotNull(T)
 instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120453941
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -528,28 +577,108 @@ public void cleanUpForRestart() throws IOException {
 deleteSavedWorkerUser();
 _workerId = null;
 }
-
+
+/**
+ * Check if the container is over its memory limit AND needs to be 
killed. This does not necessarily mean
+ * that it just went over the limit.
+ * @throws IOException on any error
+ */
+public boolean isMemoryLimitViolated(LocalAssignment 
withUpdatedLimits) throws IOException {
--- End diff --

This doesn't seem like it can throw IOException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120435642
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -141,4 +222,42 @@ public String getTopologyId() {
 }
 return ret;
 }
-}
+
+@Override
+public Map getScheduledResources() {
+return resources;
+}
+
+public void setTotalSharedOffHeapMemory(String node, double value) {
+totalSharedOffHeap.put(node, value);
+}
+
+@Override
+public Map getTotalSharedOffHeapMemory() {
+return totalSharedOffHeap;
+}
+
+/**
+ * Update the resources for this assignment (This should go aware when 
the RAS-MT bridge goes away
--- End diff --

Nit: aware -> away. What is MT btw?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120456638
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 ---
@@ -46,53 +44,43 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Collection;
 import java.util.Collections;
 
-import static 
org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genExecsAndComps;
+import static 
org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
 
 public class TestResourceAwareScheduler {
 
-private final String TOPOLOGY_SUBMITTER = "jerry";
-
 private static final Logger LOG = 
LoggerFactory.getLogger(TestResourceAwareScheduler.class);
 
 private static int currentTime = 1450418597;
 
-private static final Config defaultTopologyConf = new Config();
+private static final Config defaultTopologyConf = 
createClusterConfig(10, 128, 0, null);
 
 @BeforeClass
 public static void initConf() {
-defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, 
"org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
-
defaultTopologyConf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY,
 
org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
-
defaultTopologyConf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY,
 
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
-
-defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
-
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
-
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
128.0);
-
defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
0.0);
+  //TODO clean this up some more
--- End diff --

Todo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120451243
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -728,8 +758,145 @@ protected String javaCmd(String cmd) {
 
 return commandList;
 }
+
+  @Override
+  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) 
throws IOException {
+if (super.isMemoryLimitViolated(withUpdatedLimits)) {
+  return true;
+}
+if (_resourceIsolationManager != null) {
+  // In the short term the goal is to not shoot anyone unless we 
really need to.
+  // The on heap should limit the memory usage in most cases to a 
reasonable amount
+  // If someone is using way more than they requested this is a bug 
and we should
+  // not allow it
+  long usageMb;
+  long memoryLimitMb;
+  long hardMemoryLimitOver;
+  String typeOfCheck;
+
+  if (withUpdatedLimits.is_has_node_shared_memory()) {
+//We need to do enforcement on a topology level, not a single 
worker level...
+// Because in for cgroups each page in shared memory goes to the 
worker that touched it
+// first. We may need to make this more plugable in the future and 
let the resource
+// isolation manager tell us what to do
+usageMb = getTotalTopologyMemoryUsed();
+memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
+hardMemoryLimitOver = this.hardMemoryLimitOver * 
getTotalWorkersForThisTopology();
+typeOfCheck = "TOPOLOGY " + _topologyId;
+  } else {
+usageMb = getMemoryUsageMb();
+memoryLimitMb = this.memoryLimitMB;
+hardMemoryLimitOver = this.hardMemoryLimitOver;
+typeOfCheck = "WORKER " + _workerId;
+  }
+  LOG.debug(
+  "Enforcing memory usage for {} with usgae of {} out of {} total 
and a hard limit of {}",
--- End diff --

usgae -> usage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120438318
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java 
---
@@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
 return addConfiguration(Config.TOPOLOGY_TASKS, val);
 }
 
+@SuppressWarnings("unchecked")
 @Override
 public T setMemoryLoad(Number onHeap) {
 if (onHeap != null) {
--- End diff --

I'm wondering if it would be nicer to throw an NPE if parameters to these 
functions are null? Failing quietly could be confusing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120434212
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -61,44 +107,79 @@ public int hashCode() {
 
 @Override
 public boolean equals(Object other) {
-if (other == this) return true;
-if (other instanceof SchedulerAssignmentImpl) {
-SchedulerAssignmentImpl sother = (SchedulerAssignmentImpl) 
other;
-return topologyId.equals(sother.topologyId) &&
-executorToSlot.equals(sother.executorToSlot);
+if (!equalsIgnoreResources(other)) {
+return false;
 }
-return false;
+SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
+//Normalize some things
+Map selfResources = this.resources;
+if (selfResources == null) selfResources = Collections.emptyMap();
--- End diff --

These maps are never null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120451872
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -728,8 +758,145 @@ protected String javaCmd(String cmd) {
 
 return commandList;
 }
+
+  @Override
+  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) 
throws IOException {
+if (super.isMemoryLimitViolated(withUpdatedLimits)) {
+  return true;
+}
+if (_resourceIsolationManager != null) {
+  // In the short term the goal is to not shoot anyone unless we 
really need to.
+  // The on heap should limit the memory usage in most cases to a 
reasonable amount
+  // If someone is using way more than they requested this is a bug 
and we should
+  // not allow it
+  long usageMb;
+  long memoryLimitMb;
+  long hardMemoryLimitOver;
+  String typeOfCheck;
+
+  if (withUpdatedLimits.is_has_node_shared_memory()) {
+//We need to do enforcement on a topology level, not a single 
worker level...
+// Because in for cgroups each page in shared memory goes to the 
worker that touched it
+// first. We may need to make this more plugable in the future and 
let the resource
+// isolation manager tell us what to do
+usageMb = getTotalTopologyMemoryUsed();
+memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
+hardMemoryLimitOver = this.hardMemoryLimitOver * 
getTotalWorkersForThisTopology();
+typeOfCheck = "TOPOLOGY " + _topologyId;
+  } else {
+usageMb = getMemoryUsageMb();
+memoryLimitMb = this.memoryLimitMB;
+hardMemoryLimitOver = this.hardMemoryLimitOver;
+typeOfCheck = "WORKER " + _workerId;
+  }
+  LOG.debug(
+  "Enforcing memory usage for {} with usgae of {} out of {} total 
and a hard limit of {}",
+  typeOfCheck,
+  usageMb,
+  memoryLimitMb,
+  hardMemoryLimitOver);
+
+  if (usageMb <= 0) {
+//Looks like usage might now be supported
+return false;
+  }
+  long hardLimitMb =
+  memoryLimitMb
+  + Math.max(
+  (long) (memoryLimitMb * (hardMemoryLimitMultiplier - 
1.0)), hardMemoryLimitOver);
--- End diff --

Nit: I think this would be easier to read as 
max(memoryLimit*hardMultiplier, memoryLimit + hardLimitOver), rather than 
adding first and then adjusting the multiplier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120433627
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -19,36 +19,82 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-//TODO: improve this by maintaining slot -> executors as well for more 
efficient operations
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class SchedulerAssignmentImpl implements SchedulerAssignment {
+private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
+
 /**
  * topology-id this assignment is for.
  */
-String topologyId;
+private final String topologyId;
+
 /**
  * assignment detail, a mapping from executor to 
WorkerSlot
  */
-Map executorToSlot;
-
-public SchedulerAssignmentImpl(String topologyId, Map executorToSlots) {
-this.topologyId = topologyId;
-this.executorToSlot = new HashMap<>(0);
-if (executorToSlots != null) {
-this.executorToSlot.putAll(executorToSlots);
+private final Map executorToSlot = new 
HashMap<>();
+private final Map resources = new 
HashMap<>();
+private final Map totalSharedOffHeap = new HashMap<>();
+
+public SchedulerAssignmentImpl(String topologyId, Map executorToSlot,
+Map resources, Map totalSharedOffHeap) {
+this.topologyId = topologyId;   
+if (executorToSlot != null) {
--- End diff --

Nit: Could get rid of null checks by using Collections.emptyMap() in the 
constructor in L72 instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120444941
  
--- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
@@ -889,10 +890,83 @@
 public static String STORM_CGROUP_CGEXEC_CMD = 
"storm.cgroup.cgexec.cmd";
 
 /**
- * The amount of memory a worker can exceed its allocation before 
cgroup will kill it
+ * Please use STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB 
instead. The amount of memory a
+ * worker can exceed its allocation before cgroup will kill it.
+ */
+@isPositiveNumber(includeZero = true)
+public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
+"storm.cgroup.memory.limit.tolerance.margin.mb";
+
+/**
+ * Java does not always play nicely with cgroups. It is coming but not 
fully implemented and not
+ * for the way storm uses cgroups. In the short term you can disable 
the hard memory enforcement
+ * by cgroups and let the supervisor handle shooting workers going 
over their limit in a kinder
+ * way.
+ */
+@isBoolean
+public static String STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE = 
"storm.cgroup.memory.enforcement.enable";
+
+// Configs for memory enforcement does by the supervisor (not cgroups 
directly)
+
+/**
+ * Memory given to each worker for free (because java and storm have 
some overhead). This is
+ * memory on the box that the workers can use. This should not be 
included in
+ * SUPERVISOR_MEMORY_CAPACITY_MB, as nimbus does not use this memory 
for scheduling.
+ */
+@isPositiveNumber
+public static String STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB 
=
+"storm.supervisor.memory.limit.tolerance.margin.mb";
+
+/**
+ * A multiplier for the memory limit of a worker that will have the 
supervisor shoot it
+ * immediately. 1.0 means shoot the worker as soon as it goes over. 
2.0 means shoot the worker if
+ * its usage is double what was requested. This value is combined with
+ * STORM_SUPERVISOR_HARD_MEMORY_LIMIT_OVERAGE and which ever is 
greater is used for enforcement.
+ * This allows small workers to not be shot.
+ */
+@isPositiveNumber
+public static String STORM_SUPERVISOR_HARD_MEMORY_LIMIT_MULTIPLIER =
+"storm.supervisor.hard.memory.limit.multiplier";
+
+/**
+ * If the memory usage of a worker goes over its limit by this value 
is it shot immediately. This
+ * value is combined with 
STORM_SUPERVISOR_HARD_LIMIT_MEMORY_MULTIPLIER and which ever is greater
+ * is used for enforcement. This allows small workers to not be shot.
+ */
+@isPositiveNumber(includeZero = true)
+public static String STORM_SUPERVISOR_HARD_LIMIT_MEMORY_OVERAGE = 
"storm.supervisor.hard.memory.limit.overage";
+
+/**
+ * If the amount of memory that is free in the system (either on the 
box or in the supervisor's
+ * cgroup) is below this number (in MB) consider the system to be in 
low memory mode and start
+ * shooting workers if they are over their limit.
+ */
+@isPositiveNumber
+public static String STORM_SUPERVISOR_LOW_MEMORY_THRESHOLD = 
"storm.supervisor.low.memory.threshold";
+
+/**
+ * If the amount of memory that is free in the system (either on the 
box or in the supervisor's
+ * cgroup) is below this number (in MB) consider the system to be a 
little low on memory and start
+ * shooting workers if they are over their limit for a given grace 
period
+ * STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD.
+ */
+@isPositiveNumber
+public static String STORM_SUPERVISOR_MEDIUM_MEMORY_THRESHOLD = 
"storm.supervisor.medium.memory.threshold";
+
+/**
+ * The number of milliseconds that a worker is allowed to be over 
their limit when there is a
+ * medium amount of memory free in the system.
  */
 @isPositiveNumber
-public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB = 
"storm.cgroup.memory.limit.tolerance.margin.mb";
+public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD =
+"storm.supervisor.medium.memory.grace.period";
+
+/**
+ * @{see Config#TOPOLOGY_SCHEDULER_STRATEGY} this allows us to 
validate on the server side that it is
--- End diff --

Could this comment be rephrased? I'm a little confused what this should be 
set to, or what it does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120439921
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
@@ -559,6 +565,23 @@ public T addConfigurations(Map conf) {
 
_commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
 return (T) this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public T addSharedMemory(SharedMemory request) {
+SharedMemory found = _sharedMemory.get(request.get_name());
+if (found != null && !found.equals(request)) {
+throw new IllegalArgumentException("Cannot have multiple 
different shared memory regions with the same name");
+}
+_sharedMemory.put(request.get_name(), request);
+Set mems = _componentToSharedMemory.get(_id);
--- End diff --

Nit: Could be shortened to `_componentToSharedMemory.computeIfAbsent(_id, 
HashSet::new)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120432365
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
@@ -21,41 +21,51 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.WorkerResources;
+
 public interface SchedulerAssignment {
 /**
  * Does this slot occupied by this assignment?
  * @param slot
- * @return
+ * @return true the slot is occupied else false
--- End diff --

Nitpick: missing "if" between true and the. Also the comment two lines up 
is phrased a little weirdly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120444024
  
--- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
@@ -889,10 +890,83 @@
 public static String STORM_CGROUP_CGEXEC_CMD = 
"storm.cgroup.cgexec.cmd";
 
 /**
- * The amount of memory a worker can exceed its allocation before 
cgroup will kill it
+ * Please use STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB 
instead. The amount of memory a
+ * worker can exceed its allocation before cgroup will kill it.
+ */
+@isPositiveNumber(includeZero = true)
+public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
+"storm.cgroup.memory.limit.tolerance.margin.mb";
+
+/**
+ * Java does not always play nicely with cgroups. It is coming but not 
fully implemented and not
+ * for the way storm uses cgroups. In the short term you can disable 
the hard memory enforcement
+ * by cgroups and let the supervisor handle shooting workers going 
over their limit in a kinder
+ * way.
+ */
+@isBoolean
+public static String STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE = 
"storm.cgroup.memory.enforcement.enable";
+
+// Configs for memory enforcement does by the supervisor (not cgroups 
directly)
--- End diff --

nit: does -> done(?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120451383
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -728,8 +758,145 @@ protected String javaCmd(String cmd) {
 
 return commandList;
 }
+
+  @Override
+  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) 
throws IOException {
+if (super.isMemoryLimitViolated(withUpdatedLimits)) {
+  return true;
+}
+if (_resourceIsolationManager != null) {
+  // In the short term the goal is to not shoot anyone unless we 
really need to.
+  // The on heap should limit the memory usage in most cases to a 
reasonable amount
+  // If someone is using way more than they requested this is a bug 
and we should
+  // not allow it
+  long usageMb;
+  long memoryLimitMb;
+  long hardMemoryLimitOver;
+  String typeOfCheck;
+
+  if (withUpdatedLimits.is_has_node_shared_memory()) {
+//We need to do enforcement on a topology level, not a single 
worker level...
+// Because in for cgroups each page in shared memory goes to the 
worker that touched it
+// first. We may need to make this more plugable in the future and 
let the resource
+// isolation manager tell us what to do
+usageMb = getTotalTopologyMemoryUsed();
+memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
+hardMemoryLimitOver = this.hardMemoryLimitOver * 
getTotalWorkersForThisTopology();
+typeOfCheck = "TOPOLOGY " + _topologyId;
+  } else {
+usageMb = getMemoryUsageMb();
+memoryLimitMb = this.memoryLimitMB;
+hardMemoryLimitOver = this.hardMemoryLimitOver;
+typeOfCheck = "WORKER " + _workerId;
+  }
+  LOG.debug(
+  "Enforcing memory usage for {} with usgae of {} out of {} total 
and a hard limit of {}",
+  typeOfCheck,
+  usageMb,
+  memoryLimitMb,
+  hardMemoryLimitOver);
+
+  if (usageMb <= 0) {
+//Looks like usage might now be supported
--- End diff --

now -> not(?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120456435
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.nimbus;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Test;
+
+public class NimbusTest {
+@Test(expected=IllegalArgumentException.class)
--- End diff --

Nit: Could reduce the scope of this assertion more local with the 
ExpectedException rule


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120407010
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
 ---
@@ -34,65 +39,119 @@
 import org.apache.storm.tuple.Values;
 
 public class ResourceAwareExampleTopology {
-  public static class ExclamationBolt extends BaseRichBolt {
-OutputCollector _collector;
-
-@Override
-public void prepare(Map conf, TopologyContext context, 
OutputCollector collector) {
-  _collector = collector;
-}
-
-@Override
-public void execute(Tuple tuple) {
-  _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-  _collector.ack(tuple);
+public static class ExclamationBolt extends BaseRichBolt {
+//Have a crummy cache to show off shared memory accounting
+private static final ConcurrentHashMap 
myCrummyCache =
+new ConcurrentHashMap<>();
+private static final int CACHE_SIZE = 100_000;
+OutputCollector _collector;
+
+protected static String getFromCache(String key) {
+return myCrummyCache.get(key);
+}
+
+protected static void addToCache(String key, String value) {
+myCrummyCache.putIfAbsent(key, value);
+int numToRemove = myCrummyCache.size() - CACHE_SIZE;
+if (numToRemove > 0) {
+//Remove something randomly...
+Iterator> it = 
myCrummyCache.entrySet().iterator();
+for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
+it.next();
+it.remove();
+}
+}
+}
+
+@Override
+public void prepare(Map conf, TopologyContext context, 
OutputCollector collector) {
+_collector = collector;
+}
+
+@Override
+public void execute(Tuple tuple) {
+String orig = tuple.getString(0);
+String ret = getFromCache(orig);
+if (ret == null) {
+ret = orig + "!!!";
+addToCache(orig, ret);
+}
+_collector.emit(tuple, new Values(ret));
+_collector.ack(tuple);
+}
+
+@Override
+public void declareOutputFields(OutputFieldsDeclarer declarer) {
+declarer.declare(new Fields("word"));
+}
 }
 
-@Override
-public void declareOutputFields(OutputFieldsDeclarer declarer) {
-  declarer.declare(new Fields("word"));
+public static void main(String[] args) throws Exception {
+TopologyBuilder builder = new TopologyBuilder();
+
+//A topology can set resources in terms of CPU and Memory for each 
component
+// These can be chained (like with setting the CPU requirement)
+SpoutDeclarer spout = builder.setSpout("word", new 
TestWordSpout(), 10).setCPULoad(20);
+// Or done separately like with setting the
+// onheap and offheap memory requirement
+spout.setMemoryLoad(64, 16);
+//On heap memory is used to help calculate the heap of the java 
process for the worker
+// off heap memory is for things like JNI memory allocated off 
heap, or when using the
+// ShellBolt or ShellSpout.  In this case the 16 MB of off heap is 
just as an example
+// as we are not using it.
+
+// Some times a Bolt or Spout will have some memory that is shared 
between the instances
+// These are typically caches, but could be anything like a static 
database that is memory
+// mapped into the processes. These can be declared separately and 
added to the bolts and
+// spouts that use them.  Or if only one uses it they can be 
created inline with the add
+SharedOnHeap exclaimCache = new SharedOnHeap(100, "exclaim-cache");
+SharedOffHeapWithinNode notImplementedButJustAnExample =
+new SharedOffHeapWithinNode(500, 
"not-implemented-node-level-cache");
+
+//If CPU or memory is not set the values stored in 
topology.component.resources.onheap.memory.mb,
+// topology.component.resources.offheap.memory.mb and 
topology.component.cpu.pcore.percent
+// will be used instead
+builder
+.setBolt("exclaim1", new ExclamationBolt(), 3)
+.shuffleGrouping("word")
+.addSharedMemory(exclaimCache);
+
+builder
+.setBolt("exclaim2", new ExclamationBolt(), 2)
+.shuffleGrouping("exclaim1")
+.setMemoryLoad(100)
  

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120459423
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java 
---
@@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String 
workerId, Map resou
 }
 }
 
-if (totalMem != null) {
-MemoryCore memCore = (MemoryCore) 
workerGroup.getCores().get(SubSystemType.memory);
-try {
-
memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
-} catch (IOException e) {
-throw new RuntimeException("Cannot set 
memory.limit_in_bytes! Exception: ", e);
+if ((boolean) 
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
+if (totalMem != null) {
+int cgroupMem =
+(int)
+(Math.ceil(
+ObjectReader.getDouble(
+
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
--- End diff --

I'm wondering if having defaults here is necessary? Isn't defaults.yaml 
always expected to be present?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120440846
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
 ---
@@ -29,8 +33,8 @@
  */
 public class DefaultResourceDeclarer 
implements ResourceDeclarer, ITridentResource {
 
-private Map resources = new HashMap<>();
-private static Map conf = Utils.readStormConfig();
+private final transient Map resources = new 
HashMap<>();
--- End diff --

Why transient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120445145
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java
 ---
@@ -24,53 +24,76 @@
 import java.util.Set;
 
 /**
- * A plugin to support resource isolation and limitation within Storm
+ * A plugin to support resource isolation and limitation within Storm.
  */
 public interface ResourceIsolationInterface {
-
+
 /**
  * Called when starting up
+ *
  * @param conf the cluster config
  * @throws IOException on any error.
  */
 void prepare(Map conf) throws IOException;
 
 /**
- * This function should be used prior to starting the worker to 
reserve resources for the worker
+ * This function should be used prior to starting the worker to 
reserve resources for the worker.
+ *
  * @param workerId worker id of the worker to start
- * @param resources set of resources to limit
+ * @param workerMemory the amount of memory for the worker or null if 
not enforced
+ * @param workerCpu the amount of cpu for the worker or null if not 
enforced
  */
-void reserveResourcesForWorker(String workerId, Map 
resources);
+void reserveResourcesForWorker(String workerId, Integer workerMemory, 
Integer workerCpu);
 
 /**
- * This function will be called when the worker needs to shutdown.  
This function should include logic to clean up after a worker is shutdown
+ * This function will be called when the worker needs to shutdown. 
This function should include logic to clean up
+ * after a worker is shutdown.
+ *
  * @param workerId worker id to shutdown and clean up after
  */
 void releaseResourcesForWorker(String workerId);
 
 /**
  * After reserving resources for the worker (i.e. calling 
reserveResourcesForWorker). This function can be used
  * to get the modified command line to launch the worker with resource 
isolation
- * @param existingCommand
+ *
+ * @param existingCommand the current command to run that may need to 
be modified.
  * @return new commandline with necessary additions to launch worker 
with resource isolation
  */
 List getLaunchCommand(String workerId, List 
existingCommand);
 
 /**
  * After reserving resources for the worker (i.e. calling 
reserveResourcesForWorker). this function can be used
  * to get the launch command prefix
+ *
  * @param workerId the of the worker
  * @return the command line prefix for launching a worker with 
resource isolation
  */
 List getLaunchCommandPrefix(String workerId);
 
 /**
- * Get the list of PIDs currently in an isolated container
+ * Get the list of PIDs currently in an isolated container.
+ *
  * @param workerId the id of the worker to get these for
- * @return the set of PIDs, this will be combined with
- * other ways of getting PIDs. An Empty set if
- * no PIDs are found.
+ * @return the set of PIDs, this will be combined with other ways of 
getting PIDs. An Empty set if no PIDs are
+ * found.
  * @throws IOException on any error
  */
-Set getRunningPIDs(String workerId) throws IOException;
+Set getRunningPids(String workerId) throws IOException;
+
+/**
+ * Get the current memory usage of the a give worker.
--- End diff --

the a give worker -> the given worker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120432477
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
@@ -21,41 +21,51 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.WorkerResources;
+
 public interface SchedulerAssignment {
 /**
  * Does this slot occupied by this assignment?
  * @param slot
- * @return
+ * @return true the slot is occupied else false
  */
 public boolean isSlotOccupied(WorkerSlot slot);
 
 /**
  * is the executor assigned?
  * 
  * @param executor
- * @return
+ * @return true it is assigned else false
--- End diff --

Nit: missing if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120458603
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -144,8 +148,19 @@ public Void call() throws Exception {
 File tr = new File(tmproot);
 try {
 downloadBaseBlobs(tr);
+if (_assignment.is_set_total_node_shared()) {
+File sharedMemoryDirTmpLocation = new File(tr, 
"shared_by_topology");
+//We need to create a directory for shared memory 
to write to (we should not encourage this though)
+Path path = sharedMemoryDirTmpLocation.toPath();
+Files.createDirectories(path);
+}
 _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-
_fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, 
_topologyId), _stormRoot);
+Map topoConf = 
ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+_fsOps.setupStormCodeDir(topoConf, _stormRoot);
+if (_assignment.is_has_node_shared_memory()) {
+File sharedMemoryDir = new File(_stormRoot, 
"shared_by_topology");
--- End diff --

I might have missed it somewhere, but how does the topology code access the 
off heap shared memory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120433134
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
@@ -19,36 +19,82 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-//TODO: improve this by maintaining slot -> executors as well for more 
efficient operations
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class SchedulerAssignmentImpl implements SchedulerAssignment {
+private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
+
 /**
  * topology-id this assignment is for.
  */
-String topologyId;
+private final String topologyId;
+
 /**
  * assignment detail, a mapping from executor to 
WorkerSlot
  */
-Map executorToSlot;
-
-public SchedulerAssignmentImpl(String topologyId, Map executorToSlots) {
-this.topologyId = topologyId;
-this.executorToSlot = new HashMap<>(0);
-if (executorToSlots != null) {
-this.executorToSlot.putAll(executorToSlots);
+private final Map executorToSlot = new 
HashMap<>();
+private final Map resources = new 
HashMap<>();
+private final Map totalSharedOffHeap = new HashMap<>();
--- End diff --

Nit: Rename so it's apparent what the string is in this map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120446703
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java 
---
@@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String 
workerId, Map resou
 }
 }
 
-if (totalMem != null) {
-MemoryCore memCore = (MemoryCore) 
workerGroup.getCores().get(SubSystemType.memory);
-try {
-
memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
-} catch (IOException e) {
-throw new RuntimeException("Cannot set 
memory.limit_in_bytes! Exception: ", e);
+if ((boolean) 
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
+if (totalMem != null) {
+int cgroupMem =
+(int)
+(Math.ceil(
+ObjectReader.getDouble(
+
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
+0.0)));
+long memLimit = Long.valueOf((totalMem.longValue() + 
cgroupMem) * 1024 * 1024);
+MemoryCore memCore = (MemoryCore) 
workerGroup.getCores().get(SubSystemType.memory);
+try {
+memCore.setPhysicalUsageLimit(memLimit);
+} catch (IOException e) {
+throw new RuntimeException("Cannot set 
memory.limit_in_bytes! Exception: ", e);
+}
+// need to set memory.memsw.limit_in_bytes after setting 
memory.limit_in_bytes or error
+// might occur
+try {
+memCore.setWithSwapUsageLimit(memLimit);
+} catch (IOException e) {
+throw new RuntimeException("Cannot set 
memory.memsw.limit_in_bytes! Exception: ", e);
+}
 }
 }
 }
 
+@Override
 public void releaseResourcesForWorker(String workerId) {
 CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, 
this.rootCgroup);
 try {
 Set tasks = workerGroup.getTasks();
 if (!tasks.isEmpty()) {
-throw new Exception("Cannot correctly showdown worker 
CGroup " + workerId + "tasks " + tasks.toString() + " still running!");
+throw new Exception("Cannot correctly showdown worker 
CGroup " + workerId + "tasks " + tasks
--- End diff --

nit: showdown -> shutdown


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120406827
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
 ---
@@ -34,65 +39,119 @@
 import org.apache.storm.tuple.Values;
 
 public class ResourceAwareExampleTopology {
-  public static class ExclamationBolt extends BaseRichBolt {
-OutputCollector _collector;
-
-@Override
-public void prepare(Map conf, TopologyContext context, 
OutputCollector collector) {
-  _collector = collector;
-}
-
-@Override
-public void execute(Tuple tuple) {
-  _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-  _collector.ack(tuple);
+public static class ExclamationBolt extends BaseRichBolt {
+//Have a crummy cache to show off shared memory accounting
+private static final ConcurrentHashMap 
myCrummyCache =
+new ConcurrentHashMap<>();
+private static final int CACHE_SIZE = 100_000;
+OutputCollector _collector;
+
+protected static String getFromCache(String key) {
+return myCrummyCache.get(key);
+}
+
+protected static void addToCache(String key, String value) {
+myCrummyCache.putIfAbsent(key, value);
+int numToRemove = myCrummyCache.size() - CACHE_SIZE;
+if (numToRemove > 0) {
+//Remove something randomly...
+Iterator> it = 
myCrummyCache.entrySet().iterator();
+for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
+it.next();
+it.remove();
+}
+}
+}
+
+@Override
+public void prepare(Map conf, TopologyContext context, 
OutputCollector collector) {
+_collector = collector;
+}
+
+@Override
+public void execute(Tuple tuple) {
+String orig = tuple.getString(0);
+String ret = getFromCache(orig);
+if (ret == null) {
+ret = orig + "!!!";
+addToCache(orig, ret);
+}
+_collector.emit(tuple, new Values(ret));
+_collector.ack(tuple);
+}
+
+@Override
+public void declareOutputFields(OutputFieldsDeclarer declarer) {
+declarer.declare(new Fields("word"));
+}
 }
 
-@Override
-public void declareOutputFields(OutputFieldsDeclarer declarer) {
-  declarer.declare(new Fields("word"));
+public static void main(String[] args) throws Exception {
+TopologyBuilder builder = new TopologyBuilder();
+
+//A topology can set resources in terms of CPU and Memory for each 
component
+// These can be chained (like with setting the CPU requirement)
+SpoutDeclarer spout = builder.setSpout("word", new 
TestWordSpout(), 10).setCPULoad(20);
+// Or done separately like with setting the
+// onheap and offheap memory requirement
+spout.setMemoryLoad(64, 16);
+//On heap memory is used to help calculate the heap of the java 
process for the worker
+// off heap memory is for things like JNI memory allocated off 
heap, or when using the
+// ShellBolt or ShellSpout.  In this case the 16 MB of off heap is 
just as an example
+// as we are not using it.
+
+// Some times a Bolt or Spout will have some memory that is shared 
between the instances
+// These are typically caches, but could be anything like a static 
database that is memory
+// mapped into the processes. These can be declared separately and 
added to the bolts and
+// spouts that use them.  Or if only one uses it they can be 
created inline with the add
+SharedOnHeap exclaimCache = new SharedOnHeap(100, "exclaim-cache");
+SharedOffHeapWithinNode notImplementedButJustAnExample =
+new SharedOffHeapWithinNode(500, 
"not-implemented-node-level-cache");
+
+//If CPU or memory is not set the values stored in 
topology.component.resources.onheap.memory.mb,
+// topology.component.resources.offheap.memory.mb and 
topology.component.cpu.pcore.percent
+// will be used instead
+builder
+.setBolt("exclaim1", new ExclamationBolt(), 3)
+.shuffleGrouping("word")
+.addSharedMemory(exclaimCache);
+
+builder
+.setBolt("exclaim2", new ExclamationBolt(), 2)
+.shuffleGrouping("exclaim1")
+.setMemoryLoad(100)
  

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120407132
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -242,7 +241,8 @@
  * The strategy to use when scheduling a topology with Resource Aware 
Scheduler
  */
 @NotNull
-@isImplementationOfClass(implementsClass = IStrategy.class)
+@isString
+//TODO @isImplementationOfClass(implementsClass = IStrategy.class)
--- End diff --

TODO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120400360
  
--- Diff: conf/defaults.yaml ---
@@ -306,12 +307,18 @@ storm.cgroup.resources:
 storm.cgroup.hierarchy.name: "storm"
 storm.supervisor.cgroup.rootdir: "storm"
 storm.cgroup.cgexec.cmd: "/bin/cgexec"
-storm.cgroup.memory.limit.tolerance.margin.mb: 128.0
+storm.cgroup.memory.limit.tolerance.margin.mb: 0.0
+storm.supervisor.memory.limit.tolerance.margin.mb: 128.0
+storm.supervisor.hard.memory.limit.multiplier: 2.0
+storm.supervisor.hard.memory.limit.overage: 2024
--- End diff --

Nitpick: Some of these properties are missing the unit name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120430454
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
@@ -389,5 +397,11 @@ public LinearDRPCInputDeclarer 
addConfigurations(Map conf) {
 _component.componentConfs.add(conf);
 return this;
 }
+
+@Override
+public LinearDRPCInputDeclarer addSharedMemory(SharedMemory 
request) {
+_component.sharedMemory.add(request);
+return null;
--- End diff --

Should this return null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-06-06 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2113#discussion_r120404519
  
--- Diff: docs/Resource_Aware_Scheduler_overview.md ---
@@ -45,38 +46,53 @@ For a Storm Topology, the user can now specify the 
amount of resources a topolog
 ### Setting Memory Requirement
 
 API to set component memory requirement:
-
+```
 public T setMemoryLoad(Number onHeap, Number offHeap)
-
+```
 Parameters:
 * Number onHeap – The amount of on heap memory an instance of this 
component will consume in megabytes
 * Number offHeap – The amount of off heap memory an instance of this 
component will consume in megabytes
 
 The user also has to option to just specify the on heap memory requirement 
if the component does not have an off heap memory need.
-
+```
 public T setMemoryLoad(Number onHeap)
-
+```
 Parameters:
 * Number onHeap – The amount of on heap memory an instance of this 
component will consume
 
 If no value is provided for offHeap, 0.0 will be used. If no value is 
provided for onHeap, or if the API is never called for a component, the default 
value will be used.
 
 Example of Usage:
-
+```
 SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
 s1.setMemoryLoad(1024.0, 512.0);
 builder.setBolt("exclaim1", new ExclamationBolt(), 3)
 .shuffleGrouping("word").setMemoryLoad(512.0);
-
+```
 The entire memory requested for this topology is 16.5 GB. That is from 10 
spouts with 1GB on heap memory and 0.5 GB off heap memory each and 3 bolts with 
0.5 GB on heap memory each.
 
+
+### Shared Memory
+
+In some cases you may have memory that is shared between components. It 
may be a cache shared within a worker between instances of a bolt, or it might 
be static data that is memory mapped into a bolt and is shared accross a 
worker.  In any case you can specify your share memory request by
+creating one of `SharedOffHeapWithinNode`, `SharedOffHeapWithinWorker`, or 
`SharedOnHeap` and adding it to bolts and spouts that use that shared memory.
--- End diff --

Is there a difference between `SharedOnHeap` and just declaring a static 
field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

2017-05-12 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/2113

STORM-2497: Let Supervisor enforce memory and add in support for shared 
memory regions

This is based off of #2112 so that the formatting is simpler.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-2497

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2113


commit d8903b8358497c1b7c0ddb3588e779d5ce7c13b5
Author: Robert (Bobby) Evans 
Date:   2017-04-28T21:10:29Z

STORM-2497: Let Supervisor enforce memory and add in support for shared
memory regions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---