[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 @revans2 thanks for the explanantion ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 LGTM +1. @revans2 up to you if you want to do something more about whether those warnings should dealt with in a more serious manner ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 @revans2 Sorry for not being more clear about what I meant in regards to using the heap instead of stack space. Currently, the algorithm uses recursion to backtrack which means it is going to use space on the call stack. This can cause a stackoverflow if the stack size is not large enough to handle TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH. Thus, you might need to configure the jvm stack space (-Xss) in Nimbus to appropriately match what is expected to be set for TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH. What I am suggesting is that if there is an iterative implementation of the recursive algorithm, we would not need to put some many frames on the stack but in heap so that you don't need to configure the stack space for the Nimbus Daemon. Not something critical, but rather something to think about. ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 In terms of the configs, we may want to explore splitting the Configs into multiple pieces. Config.java has become a monolith in a sense. Configs.java in the storm-client module should probably only contain configs users can set in their topology. ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 Generally look good just some minor comments. @revans2 you can also implement a version of the algorithm using heap space in the JVM instead of stack stack space so you wouldn't need to tinker with the -Xss settings. May simplify operations. ---
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157680068 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java --- @@ -129,7 +129,15 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { protected TreeSet sortObjectResources( final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, final ExistingScheduleFunc existingScheduleFunc) { +return sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc); +} +/** + * Implelemtation of the sortObjectResources method so other strategies can reuse it. + */ +public static TreeSet sortObjectResourcesImpl( --- End diff -- I would suggest putting this in some sort of Utility class. Its kind of awkward for the ConstraintSovlerStrategy to call a static method in the GenericResourceAwareStrategy ---
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157679506 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java --- @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.RAS_Node; +import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { +private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + +protected static class SolverResult { +private final int statesSearched; +private final boolean success; +private final long timeTakenMillis; +private final int backtracked; + +public SolverResult(SearcherState state, boolean success) { +this.statesSearched = state.getStatesSearched(); +this.success = success; +timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; +backtracked = state.numBacktrack; +} + +public SchedulingResult asSchedulingResult() { +if (success) { +return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, +"Cannot find scheduling that satisfies all constraints (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +} + +protected static class SearcherState { +// Metrics +// How many states searched so far. +private int statesSearched = 0; +// Number of times we had to backtrack. +private int numBacktrack = 0; +final long startTimeMillis; +private final long maxEndTimeMs; + +// Current state +// The current executor we are trying to schedule +private int execIndex = 0; +// A map of the worker to the components in the worker to be able to enforce constraints. +private final Map> workerCompAssignment; +private final boolean[] okToRemoveFromWorker; +// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints +private final Map> nodeCompAssignment; +private final boolean[] okToRemoveFromNod
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157679086 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java --- @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.RAS_Node; +import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { +private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + +protected static class SolverResult { +private final int statesSearched; +private final boolean success; +private final long timeTakenMillis; +private final int backtracked; + +public SolverResult(SearcherState state, boolean success) { +this.statesSearched = state.getStatesSearched(); +this.success = success; +timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; +backtracked = state.numBacktrack; +} + +public SchedulingResult asSchedulingResult() { +if (success) { +return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, +"Cannot find scheduling that satisfies all constraints (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +} + +protected static class SearcherState { +// Metrics +// How many states searched so far. +private int statesSearched = 0; +// Number of times we had to backtrack. +private int numBacktrack = 0; +final long startTimeMillis; +private final long maxEndTimeMs; + +// Current state +// The current executor we are trying to schedule +private int execIndex = 0; +// A map of the worker to the components in the worker to be able to enforce constraints. +private final Map> workerCompAssignment; +private final boolean[] okToRemoveFromWorker; +// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints +private final Map> nodeCompAssignment; +private final boolean[] okToRemoveFromNod
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157677523 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java --- @@ -98,7 +98,7 @@ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, } _spreadToSchedule = new HashMap<>(); - List spreadComps = (List)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); + List spreadComps = (List)td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS); --- End diff -- I don't know about changing TOPOLOGY_SPREAD_COMPONENTS to TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS. It seems kind of weird to me that the multitenant scheduler would have a config that references RAS. ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 cool this got open sourced! ---
[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2400 @revans2 yup you are right about it being more about the size of the component. Great work! Thanks for sharing the detailed information as its a good learning experience for me! Keep me in to loop on how it goes. It would be great if yahoo could release info about topology structure so people like me can run the simulation of scheduling over a large number of topologies. ---
[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2400 @revans2 thanks for the explanation! Sorry my math was wrong! I understand the context a lot better now! Though my intuition tells me that if larger topologies usually get scheduled first, this will likely to result in more fragmentation and less topologies to be scheduled overall if there is a lot of resource contention. ---
[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2400 @revans2 can you elaborate on why EvictionPolicies are no longer needed? ---
[GitHub] storm issue #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2400 @revans2 interesting work! Though the formula used for the DefaultSchedulingPriorityStrategy seems to have a bias towards smaller topologies. For example: Total amount of Resource R in cluster= 1000 Topology 1 Resource R: Request = 100 Guarantee = 200 Topology 2 Resource R: Request = 10 Guarantee = 20 Topology 1 score = (100 - 0 - 200) / 1000 = 0.1 Topology 2 score = (10 - 0 - 20) / 1000 = 0.01 Topology 2 will be prioritized or topology one correct? Even though the ratio their respective request and guarantee is the same. This may cause larger topologies to be starved. ---
[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2400#discussion_r148923251 --- Diff: docs/Resource_Aware_Scheduler_overview.md --- @@ -303,28 +326,33 @@ To get an idea of how much memory/CPU your topology is actually using you can ad workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric"); conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics); ``` + The CPU metrics will require you to add + ``` org.apache.storm storm-metrics 1.0.0 ``` + as a topology dependency (1.0.0 or higher). You can then go to your topology on the UI, turn on the system metrics, and find the log that the LoggingMetricsConsumer is writing to. It will output results in the log like. + ``` 1454526100 node1.nodes.com:6707 -1:__system CPU {user-ms=74480, sys-ms=10780} 1454526100 node1.nodes.com:6707 -1:__system memory/nonHeap {unusedBytes=2077536, virtualFreeBytes=-64621729, initBytes=2555904, committedBytes=66699264, maxBytes=-1, usedBytes=64621728} 1454526100 node1.nodes.com:6707 -1:__system memory/heap {unusedBytes=573861408, virtualFreeBytes=694644256, initBytes=805306368, committedBytes=657719296, maxBytes=778502144, usedBytes=83857888} ``` + The metrics with -1:__system are generally metrics for the entire worker. In the example above that worker is running on node1.nodes.com:6707. These metrics are collected every 60 seconds. For the CPU you can see that over the 60 seconds this worker used 74480 + 10780 = 85260 ms of CPU time. This is equivalent to 85260/6 or about 1.5 cores. The Memory usage is similar but look at the usedBytes. offHeap is 64621728 or about 62MB, and onHeap is 83857888 or about 80MB, but you should know what you set your heap to in each of your workers already. How do you divide this up per bolt/spout? That is a bit harder and may require some trial and error from your end. -## * Enhancements on original DefaultResourceAwareStrategy * +## *Enhancements on original DefaultResourceAwareStrategy* --- End diff -- The markdown does seem to be right for this header ---
[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2400#discussion_r148923218 --- Diff: docs/Resource_Aware_Scheduler_overview.md --- @@ -243,58 +243,81 @@ http://dl.acm.org/citation.cfm?id=2814808 ### Specifying Topology Prioritization Strategy -The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies. For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance: +The order of scheduling and eviction is determined by a pluggable interface in which the cluster owner can define how topologies should be scheduled. For the owner to define his or her own prioritization strategy, she or he needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the `DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY` to point to the class that implements the strategy. For instance: ``` resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy" ``` -A default strategy will be provided. The following explains how the default scheduling priority strategy works. + +Topologies are scheduled starting at the beginning of the list returned by this plugin. If there are not enough resources to schedule the topology others are evicted starting at the end of the list. Eviction stops when there are no lower priority topologies left to evict. **DefaultSchedulingPriorityStrategy** -The order of scheduling should be based on the distance between a userâs current resource allocation and his or her guaranteed allocation. We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner? Let's use the average percentage of resource guarantees satisfied as a method of comparison. +In the past the order of scheduling was based on the distance between a userâs current resource allocation and his or her guaranteed allocation. + +We currently use a slightly different approach. We simulate scheduling the highest priority topology for each user and score the topology for each of the resources using the formula + +``` +(Requested + Assigned - Guaranteed)/Available +``` + +Where + + * `Requested` is the resource requested by this topology (or a approximation of it for complex requests like shared memory) + * `Assigned` is the resources already assigned by the simulation. + * `Guaranteed` is the resource guarantee for this user + * `Available` is the amount of that resource currently available in the cluster. -For example: +This gives a score that is negative for guaranteed requests and a score that is positive for requests that are not within the guarantee. -|User|Resource Guarantee|Resource Allocated| -||--|--| -|A|<10 CPU, 50GB>|<2 CPU, 40 GB>| -|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>| +To combine different resources the maximum of all the indavidual resource scores is used. This guarantees that if a user would go over a guarantee for a single resource it would not be offset by being under guarantee on any other resources. -User Aâs average percentage satisfied of resource guarantee: +For Example: -(2/10+40/50)/2 = 0.5 +Assume we have to schedule the following topologies. -User Bâs average percentage satisfied of resource guarantee: +|ID|User|CPU|Memory|Priority| +|---||---|--|---| +|A-1|A|100|1,000|1| +|A-2|A|100|1,000|10| +|B-1|B|100|1,000|1| +|B-2|B|100|1,000|10| -(15/20+10/25)/2 = 0.575 +The cluster as a whole has 300 CPU and 4,000 Memory. -Thus, in this example User A has a smaller average percentage of his or her resource guarantee satisfied than User B. Thus, User A should get priority to be allocated more resource, i.e., schedule a topology submitted by User A. +User A is guaranteed 100 CPU and 1,000 Memory. User B is guaranteed 200 CPU and 1,500 Memory. The scores for the most important, lowest priority number, topologies for each user would be. + +``` +A-1 Score = max(CPU: (100 + 0 - 100)/300, MEM: (1,000 + 0 - 1,000)/4,000) = 0 +B-1 Score = max(CPU: (100 + 0 - 200)/300, MEM: (1,000 + 0 - 1,500)/4,0
[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2400#discussion_r148923188 --- Diff: docs/Resource_Aware_Scheduler_overview.md --- @@ -243,58 +243,81 @@ http://dl.acm.org/citation.cfm?id=2814808 ### Specifying Topology Prioritization Strategy -The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies. For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance: +The order of scheduling and eviction is determined by a pluggable interface in which the cluster owner can define how topologies should be scheduled. For the owner to define his or her own prioritization strategy, she or he needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the `DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY` to point to the class that implements the strategy. For instance: ``` resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy" ``` -A default strategy will be provided. The following explains how the default scheduling priority strategy works. + +Topologies are scheduled starting at the beginning of the list returned by this plugin. If there are not enough resources to schedule the topology others are evicted starting at the end of the list. Eviction stops when there are no lower priority topologies left to evict. **DefaultSchedulingPriorityStrategy** -The order of scheduling should be based on the distance between a userâs current resource allocation and his or her guaranteed allocation. We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner? Let's use the average percentage of resource guarantees satisfied as a method of comparison. +In the past the order of scheduling was based on the distance between a userâs current resource allocation and his or her guaranteed allocation. + +We currently use a slightly different approach. We simulate scheduling the highest priority topology for each user and score the topology for each of the resources using the formula + +``` +(Requested + Assigned - Guaranteed)/Available +``` + +Where + + * `Requested` is the resource requested by this topology (or a approximation of it for complex requests like shared memory) + * `Assigned` is the resources already assigned by the simulation. + * `Guaranteed` is the resource guarantee for this user + * `Available` is the amount of that resource currently available in the cluster. -For example: +This gives a score that is negative for guaranteed requests and a score that is positive for requests that are not within the guarantee. -|User|Resource Guarantee|Resource Allocated| -||--|--| -|A|<10 CPU, 50GB>|<2 CPU, 40 GB>| -|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>| +To combine different resources the maximum of all the indavidual resource scores is used. This guarantees that if a user would go over a guarantee for a single resource it would not be offset by being under guarantee on any other resources. -User Aâs average percentage satisfied of resource guarantee: +For Example: -(2/10+40/50)/2 = 0.5 +Assume we have to schedule the following topologies. -User Bâs average percentage satisfied of resource guarantee: +|ID|User|CPU|Memory|Priority| +|---||---|--|---| +|A-1|A|100|1,000|1| +|A-2|A|100|1,000|10| +|B-1|B|100|1,000|1| +|B-2|B|100|1,000|10| -(15/20+10/25)/2 = 0.575 +The cluster as a whole has 300 CPU and 4,000 Memory. -Thus, in this example User A has a smaller average percentage of his or her resource guarantee satisfied than User B. Thus, User A should get priority to be allocated more resource, i.e., schedule a topology submitted by User A. +User A is guaranteed 100 CPU and 1,000 Memory. User B is guaranteed 200 CPU and 1,500 Memory. The scores for the most important, lowest priority number, topologies for each user would be. --- End diff -- "The scores for the most important, lowest priority number, topologies for each user would be." This sentence is a little confusi
[GitHub] storm pull request #2400: STORM-2792: Remove RAS EvictionPolicy and cleanup
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2400#discussion_r148923121 --- Diff: docs/Resource_Aware_Scheduler_overview.md --- @@ -243,58 +243,81 @@ http://dl.acm.org/citation.cfm?id=2814808 ### Specifying Topology Prioritization Strategy -The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies. For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance: +The order of scheduling and eviction is determined by a pluggable interface in which the cluster owner can define how topologies should be scheduled. For the owner to define his or her own prioritization strategy, she or he needs to implement the ISchedulingPriorityStrategy interface. A user can set the scheduling priority strategy by setting the `DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY` to point to the class that implements the strategy. For instance: ``` resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy" ``` -A default strategy will be provided. The following explains how the default scheduling priority strategy works. + +Topologies are scheduled starting at the beginning of the list returned by this plugin. If there are not enough resources to schedule the topology others are evicted starting at the end of the list. Eviction stops when there are no lower priority topologies left to evict. **DefaultSchedulingPriorityStrategy** -The order of scheduling should be based on the distance between a userâs current resource allocation and his or her guaranteed allocation. We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner? Let's use the average percentage of resource guarantees satisfied as a method of comparison. +In the past the order of scheduling was based on the distance between a userâs current resource allocation and his or her guaranteed allocation. + +We currently use a slightly different approach. We simulate scheduling the highest priority topology for each user and score the topology for each of the resources using the formula + +``` +(Requested + Assigned - Guaranteed)/Available +``` + +Where + + * `Requested` is the resource requested by this topology (or a approximation of it for complex requests like shared memory) + * `Assigned` is the resources already assigned by the simulation. + * `Guaranteed` is the resource guarantee for this user + * `Available` is the amount of that resource currently available in the cluster. -For example: +This gives a score that is negative for guaranteed requests and a score that is positive for requests that are not within the guarantee. -|User|Resource Guarantee|Resource Allocated| -||--|--| -|A|<10 CPU, 50GB>|<2 CPU, 40 GB>| -|B|< 20 CPU, 25GB>|<15 CPU, 10 GB>| +To combine different resources the maximum of all the indavidual resource scores is used. This guarantees that if a user would go over a guarantee for a single resource it would not be offset by being under guarantee on any other resources. --- End diff -- "indavidual" is misspelled ---
[GitHub] storm issue #2376: STORM-2779 NPE on shutting down WindowedBoltExecutor (1.1...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2376 +1 ---
[GitHub] storm issue #2374: STORM-2779 NPE on shutting down WindowedBoltExecutor
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2374 +1 ---
[GitHub] storm pull request #2314: [STORM-2731] - Simple checks in Storm Windowing
GitHub user jerrypeng reopened a pull request: https://github.com/apache/storm/pull/2314 [STORM-2731] - Simple checks in Storm Windowing There is also inconsistent and mixed used of Longs and Ints throughout the windowing code. Perhaps we should just change all the numeric values to use longs? Especially since all calculation of time is done in milliseconds. It would also be nice to make clear in both the code and documentation that all timestamps and time based calculations is in milliseconds. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-2731 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2314.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2314 commit a64199ac5b5eb59205241ff09cd1d74440ba922f Author: Jerry Peng Date: 2017-09-07T22:32:39Z [STORM-2731] - Simple checks in Storm Windowing commit fb7c7a0ff4496d60ed5742702945090ba7c5674a Author: Jerry Peng Date: 2017-09-19T20:46:32Z fixing exception message commit f657dfcfadb74c967d4ef8bf711e4c27af6ccf45 Author: Jerry Peng Date: 2017-09-19T20:55:08Z fixing lower case ---
[GitHub] storm issue #2314: [STORM-2731] - Simple checks in Storm Windowing
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2314 @srdo thanks for the review. Lets do the larger change in a separate PR. ---
[GitHub] storm pull request #2314: [STORM-2731] - Simple checks in Storm Windowing
Github user jerrypeng closed the pull request at: https://github.com/apache/storm/pull/2314 ---
[GitHub] storm issue #2331: [STORM-2743] Add logging to monitor how long scheduling i...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2331 +1 but perhaps its work while looking into using something like dropwizard metrics: http://metrics.dropwizard.io/3.2.3/manual/core.html ---
[GitHub] storm issue #2314: [STORM-2731] - Simple checks in Storm Windowing
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2314 @srdo I totally agree with your idea. I looked into changing all time variables to be long and renaming all variables to have a "Ms". It will probably be a significant diff ---
[GitHub] storm pull request #2314: [STORM-2731] - Simple checks in Storm Windowing
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/2314 [STORM-2731] - Simple checks in Storm Windowing There is also inconsistent and mixed used of Longs and Ints throughout the windowing code. Perhaps we should just change all the numeric values to use longs? Especially since all calculation of time is done in milliseconds. It would also be nice to make clear in both the code and documentation that all timestamps and time based calculations is in milliseconds. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-2731 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2314.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2314 commit 40d133b6c9f434eabc3b50bb8f4ebf91d530 Author: Jerry Peng Date: 2017-09-07T22:32:39Z [STORM-2731] - Simple checks in Storm Windowing ---
[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2199#discussion_r135584078 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -42,14 +43,16 @@ private Map conf; private ISchedulingPriorityStrategy schedulingPrioritystrategy; private IEvictionStrategy evictionStrategy; +private IConfigLoader configLoader; @Override public void prepare(Map conf) { this.conf = conf; schedulingPrioritystrategy = (ISchedulingPriorityStrategy) ReflectionUtils.newInstance( -(String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY)); +(String) conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY)); --- End diff -- is there a reason why we need to change the spacing in many places in this file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2113 cool feature --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2143: STORM-2503: Restore comparator logic in `DefaultResourceA...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2143 @adityasharad Thanks for changing it back. What about the unit test that was modified as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2100: STORM-2503: Fix lgtm.com alerts on equality and compariso...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2100 @adityasharad is I agree with your change for the sortComponents on line 544. Thank you for correcting that, but can you change the sorting logic of sortNeighbors to as before since I believe that is the correct sorting logic. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2100: STORM-2503: Fix lgtm.com alerts on equality and compariso...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2100 @adityasharad thanks for addressing these issues but can you tell me why you decided to change the compare function that I commented above? @revans2 do you believe the sorting mentioned was incorrect before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2100: STORM-2503: Fix lgtm.com alerts on equality and co...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2100#discussion_r118366750 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java --- @@ -567,9 +567,9 @@ public int compare(Component o1, Component o2) { public int compare(Component o1, Component o2) { int connections1 = o1.execs.size() * thisComp.execs.size(); int connections2 = o2.execs.size() * thisComp.execs.size(); -if (connections1 > connections2) { --- End diff -- hey guys this change is not right. This is suppose to sort by the number of connections in descending order so that tasks with most connections to each other can be attempted to be colocated first. What is the reason for this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1973: STORM-2385: pacemaker_state_factory.clj does not compile ...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1973 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1934 Thanks @revans2 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1934 Thanks @revans2 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1934 thanks @revans2 for taking the time to refactor some of the code! I reviewed it once more. If there is an example of how to user the metrics in the docs I think that would be extremely helpful to users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1934#discussion_r102382536 --- Diff: storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java --- @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.cgroup; + +import java.util.Map; + +import org.apache.storm.container.cgroup.SubSystemType; +import org.apache.storm.container.cgroup.core.CgroupCore; +import org.apache.storm.container.cgroup.core.MemoryCore; + +/** + * Reports the current memory usage of the cgroup for this worker + */ +public class CGroupMemoryUsage extends CGroupMetricsBase { + +public CGroupMemoryUsage(Map conf) { +super(conf, SubSystemType.memory); +} + +@Override +public Long getDataFrom(CgroupCore core) throws Exception { --- End diff -- same question here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1934#discussion_r102382461 --- Diff: storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java --- @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.cgroup; + +import java.util.Map; + +import org.apache.storm.container.cgroup.SubSystemType; +import org.apache.storm.container.cgroup.core.CgroupCore; +import org.apache.storm.container.cgroup.core.MemoryCore; + +/** + * Reports the current memory limit of the cgroup for this worker + */ +public class CGroupMemoryLimit extends CGroupMetricsBase { + +public CGroupMemoryLimit(Map conf) { +super(conf, SubSystemType.memory); +} + +@Override +public Long getDataFrom(CgroupCore core) throws Exception { --- End diff -- Same question here but also this function throws an Exception except for IOException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1934#discussion_r102382308 --- Diff: storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java --- @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.cgroup; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.container.cgroup.SubSystemType; +import org.apache.storm.container.cgroup.core.CgroupCore; +import org.apache.storm.container.cgroup.core.CpuacctCore; +import org.apache.storm.container.cgroup.core.CpuacctCore.StatType; + +/** + * Report CPU used in the cgroup + */ +public class CGroupCpu extends CGroupMetricsBase> { +long previousSystem = 0; +long previousUser = 0; +private int userHz = -1; + +public CGroupCpu(Map conf) { +super(conf, SubSystemType.cpuacct); +} + +public synchronized int getUserHZ() throws IOException { +if (userHz < 0) { +ProcessBuilder pb = new ProcessBuilder("getconf", "CLK_TCK"); +Process p = pb.start(); +BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream())); +String line = in.readLine().trim(); +userHz = Integer.valueOf(line); +} +return userHz; +} + +@Override +public Map getDataFrom(CgroupCore core) throws IOException { +CpuacctCore cpu = (CpuacctCore) core; +Map stat = cpu.getCpuStat(); --- End diff -- is there a reason why we not doing a try catch of an IOException here similar for cpu.getCpuShares() below but just propagating the exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1934#discussion_r102381366 --- Diff: docs/cgroups_in_storm.md --- @@ -43,13 +43,24 @@ group storm { } cpu { } + memory { + } + cpuacct { + } } ``` For a more detailed explanation of the format and configs for the cgconfig.conf file, please visit: https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/ch-Using_Control_Groups.html#The_cgconfig.conf_File +To let storm manage the cgroups for indavidual workers you need to make sure that the resources you want to control are mounted under the same directory as in the example above. +If they are not in the same directory the supervisor will throw an exception. + +The perm section needs to be configured so that the user the supervisor is running as can modify the group. + +If run as user is enabled so the supervisor spawns other processes as the user that launched the topology make sure that the permissions are such that indavidual users have read access but not write access. --- End diff -- perhaps more clean if "run as user"is enabled so **that** the supervisor spawns other... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1934 @revans2 this is a really cool feature and thanks for adding it. My only question is that I see you have implemented some parsers/getters for the cgroup metrics. There is already a set of them implemented in the cgroup package. Is there a reason why we shouldn't reuse them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1934#discussion_r100951683 --- Diff: storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.cgroup; + +import java.util.Map; + +import org.apache.storm.container.cgroup.core.CpuCore; + +/** + * Report the guaranteed number of ms this worker has requested. + */ +public class CGroupCpuGuarantee extends CGroupMetricsBase { +long previousTime = -1; + +public CGroupCpuGuarantee(Map conf) { +super(conf, CpuCore.CPU_SHARES); +} + +@Override +public Long parseFileContents(String contents) { --- End diff -- Function already defined: https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java#L50 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1934: STORM-2333: CGroup memory and CPU metrics
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1934#discussion_r100951524 --- Diff: storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java --- @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.cgroup; + +import java.util.Map; + +/** + * Reports the current memory limit of the cgroup for this worker + */ +public class CGroupMemoryLimit extends CGroupMetricsBase { + +public CGroupMemoryLimit(Map conf) { +super(conf, "memory.limit_in_bytes"); +} + +@Override +public Long parseFileContents(String contents) { +return Long.parseLong(contents.trim()); --- End diff -- Many of these cgroup getting functions have been implemented in the cgroup package: https://github.com/apache/storm/tree/master/storm-core/src/jvm/org/apache/storm/container/cgroup/core for example there is already a method of get memory.limit_in_bytes: https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java#L133 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1932: [STORM-2194] Stop ignoring socket timeout error from exec...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1932 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1812: Updating Trident RAS Documentation.
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1812 LGTM +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1764 LGTM +1 @revans2 thanks for making the optimizations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1764: STORM-2190: reduce contention between submission a...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1764#discussion_r90540863 --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj --- @@ -1008,23 +1008,24 @@ (reset! (:id->worker-resources nimbus) {})) ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment -(doseq [[topology-id assignment] new-assignments -:let [existing-assignment (get existing-assignments topology-id) - topology-details (.getById topologies topology-id)]] - (if (= existing-assignment assignment) -(log-debug "Assignment for " topology-id " hasn't changed") -(do - (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) - (.setAssignment storm-cluster-state topology-id (thriftify-assignment assignment)) - ))) -(->> new-assignments - (map (fn [[topology-id assignment]] -(let [existing-assignment (get existing-assignments topology-id)] - [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] - ))) - (into {}) - (.assignSlots inimbus topologies))) -(log-message "not a leader, skipping assignments"))) +(locking (:sched-lock nimbus) --- End diff -- maybe its also time to start thinking about decentralized scheduling mechanism if certain scheduling strategies may take a while to compute a schedule, but that would require a major overhaul in nimbus. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1785: [STORM-2201] Add dynamic scheduler configuration l...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1785#discussion_r90526318 --- Diff: storm-core/src/jvm/org/apache/storm/Config.java --- @@ -2177,13 +2177,37 @@ @isMapEntryType(keyType = String.class, valueType = Number.class) public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools"; + /** + * A plugin that should load the user pools for the multitenant scheduler + */ +@isString +public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER = "multitenant.scheduler.user.pools.loader"; + +/** + * Configuration elements for scheduler config loader + */ +@isMapEntryType(keyType = String.class, valueType = String.class) +public static final String MULTITENANT_SCHEDULER_USER_POOLS_LOADER_PARAMS = "multitenant.scheduler.user.pools.loader.params"; + /** * A map of users to another map of the resource guarantees of the user. Used by Resource Aware Scheduler to ensure * per user resource guarantees. */ @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {UserResourcePoolEntryValidator.class}) public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools"; + /** + * A plugin that should load the user pools for the resource aware scheduler + */ +@isString +public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS_LOADER = "resource.aware.scheduler.user.pools.loader"; --- End diff -- Yup that is what @isImplementationOfClass is for. We should use it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1804: STORM-2222: Repeated NPEs thrown in nimbus if rebalance f...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1804 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1733: [STORM-2134] - improving the current scheduling strategy ...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1733 @revans2 thanks for the reviews. I have addressed your comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1733: [STORM-2134] - improving the current scheduling strategy ...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1733 Also re-add some RAS related tests that were accidentally deleted in the past --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1733: [STORM-2134] - improving the current scheduling st...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1733 [STORM-2134] - improving the current scheduling strategy for RAS You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-2134 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1733.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1733 commit d4e943794458f8177a518b68eb2efe09d48d0c6d Author: Boyang Jerry Peng Date: 2016-10-06T18:45:55Z [STORM-2134] - improving the current scheduling strategy for RAS commit caf525e63cbb441267b0f7ad4ba51b48675e4fb2 Author: Boyang Jerry Peng Date: 2016-10-06T19:25:51Z re-adding accidentally deleted RAS tests commit 0d616ba539a3e72b7c26fe74a7e8690f7b60a6e8 Author: Boyang Jerry Peng Date: 2016-10-10T03:44:32Z adding to documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1713: Storm 2124 show requested cpu mem for each component
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1713 Did some manual testing with the UI and everything looks good +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1654: STORM-2066: make error message in IsolatedPool.java more ...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1654 LGTM +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1708: [STORM-2119] - bug in log message printing to stdo...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1708 [STORM-2119] - bug in log message printing to stdout You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-2119 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1708.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1708 commit 967132b80245cdfd8de70cbe4112524c85b75392 Author: Boyang Jerry Peng Date: 2016-09-22T18:39:24Z [STORM-2119] - bug in log message printing to stdout --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1670: [STORM-2079] - Unneccessary readStormConfig operat...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1670 [STORM-2079] - Unneccessary readStormConfig operation You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-2079 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1670.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1670 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1659: [STORM-2056] - Bugs in logviewer
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1659 [STORM-2056] - Bugs in logviewer You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-2056 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1659.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1659 commit 71fb390a0b31d3b48a238b6777c3a9c7cb481245 Author: Boyang Jerry Peng Date: 2016-08-30T20:38:05Z [STORM-2056] - Bugs in logviewer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1619: Fix parent version of storm-druid and storm-kinesis
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1619 +1 just faced this issue. Glad you already have a fix up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1621: [STORM-1766] - A better algorithm server rack sele...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1621 [STORM-1766] - A better algorithm server rack selection for RAS Backport of #1398 to 1.x branch. I'm not sure this actually needs a PR, but since it's been a while since #1500 was merged, I'll put one anyways since the code went into 2.x a while ago You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm 1.x-STORM-1766 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1621.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1621 commit d50a2437923f2919fbee130b8e9e86a62a2d9f48 Author: Boyang Jerry Peng Date: 2016-05-04T22:08:57Z [STORM-1766] - A better algorithm server rack selection for RAS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1620: STORM-1913: Additions and Improvements for Trident RAS AP...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1620 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1592: STORM-1994: Add table with per-topology and worker resour...
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/1592 @HeartSaVioR I have pushed critical bug fixes for RAS to the 1.x branch. There has been a few algorithmic improvements on RAS that I had only put into 2.x but I desire to get them into 1.x as well when I have time. I have found this feature to be extremely useful regardless of whether RAS is being used. It tells you what components are scheduled in each worker and tells you the uptime of each worker so you can monitor the stability of your topology. We have no clear timeline for when the translation of nimbus will be done, and I am not sure its good to wait for an uncertain amount of time. Thus I am +1 for this feature --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1510: [STORM-1866] - Update Resource Aware Scheduler Doc...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1510 [STORM-1866] - Update Resource Aware Scheduler Documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-1866 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1510.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1510 commit acf0ce193ed84ab51e3eff46fb4af7281ad8239e Author: Boyang Jerry Peng Date: 2016-06-22T17:19:27Z [STORM-1866] - Update Resource Aware Scheduler Documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1766] - A better algorithm server rack ...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1398#issuecomment-221686962 @ptgoetz oh i see, thanks for letting me know! I will remember next time to put a comment in the jira regarding which branches i merged the corresponding PR to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1766] - A better algorithm server rack ...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1398#issuecomment-221678273 @ptgoetz just merged it into master why? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1766] - A better algorithm server rack ...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1398#issuecomment-221665618 @ptgoetz I have created a jira: https://issues.apache.org/jira/browse/STORM-1866 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1766] - A better algorithm server rack ...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1398#issuecomment-221314974 @redsanket thanks for the review. Do you have any other comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1766] - A better algorithm server rack ...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1398#discussion_r64404386 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java --- @@ -45,6 +47,7 @@ import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.scheduler.resource.Component; + --- End diff -- will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1631] - Storm CGroup bugs 1) when launc...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1216#issuecomment-220783660 At yahoo, cgroups are used to limit both memory and cpu of every worker so that topologies can have more resource isolation On Saturday, May 21, 2016, Xin Wang wrote: > @yegortokmakov <https://github.com/yegortokmakov> in general, for one > topology: > i:woker number > worker number < cluster node number > worker/per node > ii:thread number > this depends on your logic. for example, the parallelism of KafkaSpout > should be equal to the number of partitions, in order to get a good > performance. > > btw, I really recommend you to upgrade your storm to 1.0.1. > > â > You are receiving this because you authored the thread. > Reply to this email directly or view it on GitHub > <https://github.com/apache/storm/pull/1216#issuecomment-220783029> > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1766] - A better algorithm server rack ...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1398 [STORM-1766] - A better algorithm server rack selection for RAS You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-1766 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1398.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1398 commit 848ef21df184d046ac6617ea2bce1efc00e13a13 Author: Boyang Jerry Peng Date: 2016-05-04T22:08:57Z [STORM-1766] - A better algorithm server rack selection for RAS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1646] Fix Kafka unit tests
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1335#issuecomment-215831984 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1707] Remove two minute timeout after w...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1370#issuecomment-215826814 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1681] - Bug in scheduling cyclic topolo...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1307#issuecomment-208946081 Non-related failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1681] - Bug in scheduling cyclic topolo...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1307 [STORM-1681] - Bug in scheduling cyclic topologies when scheduling with th RAS There is a bug in the bfs algorithm in RAS that does not correctly account for components already visited during the breadth first traveral You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-1681 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1307.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1307 commit 77058f60a39bb19a422c1ca34e7893b4184e50ea Author: Boyang Jerry Peng Date: 2016-04-04T16:03:05Z [STORM-1681] - Bug in scheduling cyclic topologies when scheduling with RAS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1232#issuecomment-202514705 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1232#discussion_r57491392 --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java --- @@ -54,6 +61,681 @@ private static int currentTime = 1450418597; +private static final Config defaultTopologyConf = new Config(); + +@BeforeClass +public static void initConf() { +defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + +defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); +defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); +defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); +defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); +} + +@Test +public void testRASNodeSlotAssign() { +INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); +Map resourceMap = new HashMap<>(); +resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); +resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); +Map supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap); +Topologies topologies = new Topologies(new HashMap()); +Cluster cluster = new Cluster(iNimbus, supMap, new HashMap(), new HashMap()); +Map nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies); +Assert.assertEquals(5, nodes.size()); +RAS_Node node = nodes.get("sup-0"); + +Assert.assertEquals("sup-0", node.getId()); +Assert.assertTrue(node.isAlive()); +Assert.assertEquals(0, node.getRunningTopologies().size()); +Assert.assertTrue(node.isTotallyFree()); +Assert.assertEquals(4, node.totalSlotsFree()); +Assert.assertEquals(0, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors11 = new ArrayList<>(); +executors11.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors11); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(3, node.totalSlotsFree()); +Assert.assertEquals(1, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors12 = new ArrayList<>(); +executors12.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors12); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(2, node.totalSlotsFree()); +Assert.assertEquals(2, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors21 = new ArrayList<>(); +executors21.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology2, executors21); +Assert.assertEquals(2, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(1, node.totalSlotsFree()); +Assert.assertEquals(3, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors22 = new ArrayList<>(); +executors22.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(
[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1232#discussion_r57486697 --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java --- @@ -54,6 +61,681 @@ private static int currentTime = 1450418597; +private static final Config defaultTopologyConf = new Config(); + +@BeforeClass +public static void initConf() { +defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + +defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); +defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); +defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); +defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); +} + +@Test +public void testRASNodeSlotAssign() { +INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); +Map resourceMap = new HashMap<>(); +resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); +resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); +Map supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap); +Topologies topologies = new Topologies(new HashMap()); +Cluster cluster = new Cluster(iNimbus, supMap, new HashMap(), new HashMap()); +Map nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies); +Assert.assertEquals(5, nodes.size()); +RAS_Node node = nodes.get("sup-0"); + +Assert.assertEquals("sup-0", node.getId()); +Assert.assertTrue(node.isAlive()); +Assert.assertEquals(0, node.getRunningTopologies().size()); +Assert.assertTrue(node.isTotallyFree()); +Assert.assertEquals(4, node.totalSlotsFree()); +Assert.assertEquals(0, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors11 = new ArrayList<>(); +executors11.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors11); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(3, node.totalSlotsFree()); +Assert.assertEquals(1, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors12 = new ArrayList<>(); +executors12.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors12); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(2, node.totalSlotsFree()); +Assert.assertEquals(2, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors21 = new ArrayList<>(); +executors21.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology2, executors21); +Assert.assertEquals(2, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(1, node.totalSlotsFree()); +Assert.assertEquals(3, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors22 = new ArrayList<>(); +executors22.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(
[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1232#discussion_r57485387 --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java --- @@ -54,6 +61,681 @@ private static int currentTime = 1450418597; +private static final Config defaultTopologyConf = new Config(); + +@BeforeClass +public static void initConf() { +defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + +defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); +defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); +defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); +defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); +} + +@Test +public void testRASNodeSlotAssign() { +INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); +Map resourceMap = new HashMap<>(); +resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); +resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); +Map supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap); +Topologies topologies = new Topologies(new HashMap()); +Cluster cluster = new Cluster(iNimbus, supMap, new HashMap(), new HashMap()); +Map nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies); +Assert.assertEquals(5, nodes.size()); +RAS_Node node = nodes.get("sup-0"); + +Assert.assertEquals("sup-0", node.getId()); +Assert.assertTrue(node.isAlive()); +Assert.assertEquals(0, node.getRunningTopologies().size()); +Assert.assertTrue(node.isTotallyFree()); +Assert.assertEquals(4, node.totalSlotsFree()); +Assert.assertEquals(0, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors11 = new ArrayList<>(); +executors11.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors11); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(3, node.totalSlotsFree()); +Assert.assertEquals(1, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors12 = new ArrayList<>(); +executors12.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors12); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(2, node.totalSlotsFree()); +Assert.assertEquals(2, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors21 = new ArrayList<>(); +executors21.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology2, executors21); +Assert.assertEquals(2, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(1, node.totalSlotsFree()); +Assert.assertEquals(3, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors22 = new ArrayList<>(); +executors22.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(
[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1232#discussion_r57485480 --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java --- @@ -54,6 +61,681 @@ private static int currentTime = 1450418597; +private static final Config defaultTopologyConf = new Config(); + +@BeforeClass +public static void initConf() { +defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName()); + defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName()); + +defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName()); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0); + defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0); +defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0); +defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0); +defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo"); +} + +@Test +public void testRASNodeSlotAssign() { +INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest(); +Map resourceMap = new HashMap<>(); +resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0); +resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0); +Map supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap); +Topologies topologies = new Topologies(new HashMap()); +Cluster cluster = new Cluster(iNimbus, supMap, new HashMap(), new HashMap()); +Map nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies); +Assert.assertEquals(5, nodes.size()); +RAS_Node node = nodes.get("sup-0"); + +Assert.assertEquals("sup-0", node.getId()); +Assert.assertTrue(node.isAlive()); +Assert.assertEquals(0, node.getRunningTopologies().size()); +Assert.assertTrue(node.isTotallyFree()); +Assert.assertEquals(4, node.totalSlotsFree()); +Assert.assertEquals(0, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors11 = new ArrayList<>(); +executors11.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors11); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(3, node.totalSlotsFree()); +Assert.assertEquals(1, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors12 = new ArrayList<>(); +executors12.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(), topology1, executors12); +Assert.assertEquals(1, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(2, node.totalSlotsFree()); +Assert.assertEquals(2, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0); + +List executors21 = new ArrayList<>(); +executors21.add(new ExecutorDetails(1, 1)); +node.assign(node.getFreeSlots().iterator().next(), topology2, executors21); +Assert.assertEquals(2, node.getRunningTopologies().size()); +Assert.assertFalse(node.isTotallyFree()); +Assert.assertEquals(1, node.totalSlotsFree()); +Assert.assertEquals(3, node.totalSlotsUsed()); +Assert.assertEquals(4, node.totalSlots()); + +List executors22 = new ArrayList<>(); +executors22.add(new ExecutorDetails(2, 2)); +node.assign(node.getFreeSlots().iterator().next(
[GitHub] storm pull request: Adding documentation for Trident RAS API
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1256#discussion_r57391018 --- Diff: docs/Trident-RAS-API.md --- @@ -0,0 +1,49 @@ +--- +title: Trident RAS API +layout: documentation +documentation: true +--- + +## Trident RAS API + +The Trident RAS (Resource Aware Scheduler) API provides a mechanism to specify the resource consumption of their topology. The API looks exactly like the base RAS API, only it is called on Trident Streams instead of Bolts and Spouts. + +In order to avoid duplication and inconsistency in documentation, the purpose and effects of resource setting are not described here, but are instead found in the [Resource Aware Scheduler Overview](Resource_Aware_Scheduler_overview.html) + +### Use + +First an example: + +```java +TridentTopology topo = new TridentTopology(); +TridentState wordCounts = +topology +.newStream("words", feeder) +.parallelismHint(5) +.setCPULoad(20) +.setMemoryLoad(512,256) +.each( new Fields("sentence"), new Split(), new Fields("word")) +.setCPULoad(10) +.setMemoryLoad(512) +.each(new Fields("word"), new BangAdder(), new Fields("word!")) +.parallelismHint(10) +.setCPULoad(50) +.setMemoryLoad(1024) +.groupBy(new Fields("word!")) +.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) +.setCPULoad(100) +.setMemoryLoad(2048); +``` + +Resources can be set per operation (except for grouping, shuffling, partitioning). +Operations that are combined by Trident into single Bolts have their resources summed. + +Every Bolt is given **at least** the default resources, regardless of user settings. + +In the above case, we end up with --- End diff -- The aforementioned Trident Topology example will be transformed into a Storm Topology with the following components: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Adding documentation for Trident RAS API
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1256#discussion_r57389473 --- Diff: docs/Trident-RAS-API.md --- @@ -0,0 +1,49 @@ +--- +title: Trident RAS API +layout: documentation +documentation: true +--- + +## Trident RAS API + +The Trident RAS (Resource Aware Scheduler) API provides a mechanism to specify the resource consumption of their topology. The API looks exactly like the base RAS API, only it is called on Trident Streams instead of Bolts and Spouts. + +In order to avoid duplication and inconsistency in documentation, the purpose and effects of resource setting are not described here, but are instead found in the [Resource Aware Scheduler Overview](Resource_Aware_Scheduler_overview.html) + +### Use + +First an example: + +```java +TridentTopology topo = new TridentTopology(); +TridentState wordCounts = +topology +.newStream("words", feeder) +.parallelismHint(5) +.setCPULoad(20) +.setMemoryLoad(512,256) +.each( new Fields("sentence"), new Split(), new Fields("word")) +.setCPULoad(10) +.setMemoryLoad(512) +.each(new Fields("word"), new BangAdder(), new Fields("word!")) +.parallelismHint(10) +.setCPULoad(50) +.setMemoryLoad(1024) +.groupBy(new Fields("word!")) +.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) +.setCPULoad(100) +.setMemoryLoad(2048); +``` + +Resources can be set per operation (except for grouping, shuffling, partitioning). +Operations that are combined by Trident into single Bolts have their resources summed. + +Every Bolt is given **at least** the default resources, regardless of user settings. + +In the above case, we end up with + * a spout and spout coordinator with a CPU load of 20% each, and a memory load of 512MiB on heap and 256MiB off heap. + * a bolt with 60% cpu load (10% + 50%) and a memory load of 1536MiB (1024 + 512) on heap from the combined `Split` and `BangAdder` + * a bolt with 100% cpu load and a memory load of 2048MiB. + +The methods can be called for every operation (or some of the operations) or used in the same manner as `parallelismHint()`. --- End diff -- reword: The RAS API can be used in the same manner as parrallelismHint(), i.e. resource declarations have the same *boundaries* as parallelismHints. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Adding documentation for Trident RAS API
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1256#discussion_r57387622 --- Diff: docs/Trident-RAS-API.md --- @@ -0,0 +1,49 @@ +--- +title: Trident RAS API +layout: documentation +documentation: true +--- + +## Trident RAS API + +The Trident RAS (Resource Aware Scheduler) API provides a mechanism to specify the resource consumption of their topology. The API looks exactly like the base RAS API, only it is called on Trident Streams instead of Bolts and Spouts. + +In order to avoid duplication and inconsistency in documentation, the purpose and effects of resource setting are not described here, but are instead found in the [Resource Aware Scheduler Overview](Resource_Aware_Scheduler_overview.html) + +### Use + +First an example: + +```java +TridentTopology topo = new TridentTopology(); +TridentState wordCounts = +topology +.newStream("words", feeder) +.parallelismHint(5) +.setCPULoad(20) +.setMemoryLoad(512,256) +.each( new Fields("sentence"), new Split(), new Fields("word")) +.setCPULoad(10) +.setMemoryLoad(512) +.each(new Fields("word"), new BangAdder(), new Fields("word!")) +.parallelismHint(10) +.setCPULoad(50) +.setMemoryLoad(1024) +.groupBy(new Fields("word!")) +.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) +.setCPULoad(100) +.setMemoryLoad(2048); +``` + +Resources can be set per operation (except for grouping, shuffling, partitioning). +Operations that are combined by Trident into single Bolts have their resources summed. --- End diff -- ... into single Bolts **will** have their ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Adding documentation for Trident RAS API
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1256#discussion_r57387004 --- Diff: docs/Trident-RAS-API.md --- @@ -0,0 +1,49 @@ +--- +title: Trident RAS API +layout: documentation +documentation: true +--- + +## Trident RAS API + +The Trident RAS (Resource Aware Scheduler) API provides a mechanism to specify the resource consumption of their topology. The API looks exactly like the base RAS API, only it is called on Trident Streams instead of Bolts and Spouts. + +In order to avoid duplication and inconsistency in documentation, the purpose and effects of resource setting are not described here, but are instead found in the [Resource Aware Scheduler Overview](Resource_Aware_Scheduler_overview.html) + +### Use + +First an example: + +```java +TridentTopology topo = new TridentTopology(); +TridentState wordCounts = +topology +.newStream("words", feeder) +.parallelismHint(5) +.setCPULoad(20) +.setMemoryLoad(512,256) +.each( new Fields("sentence"), new Split(), new Fields("word")) +.setCPULoad(10) +.setMemoryLoad(512) +.each(new Fields("word"), new BangAdder(), new Fields("word!")) +.parallelismHint(10) +.setCPULoad(50) +.setMemoryLoad(1024) +.groupBy(new Fields("word!")) +.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) +.setCPULoad(100) +.setMemoryLoad(2048); +``` + +Resources can be set per operation (except for grouping, shuffling, partitioning). --- End diff -- Resources can be set ** for each **operation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Adding documentation for Trident RAS API
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1256#discussion_r57386690 --- Diff: docs/Trident-RAS-API.md --- @@ -0,0 +1,49 @@ +--- +title: Trident RAS API +layout: documentation +documentation: true +--- + +## Trident RAS API + +The Trident RAS (Resource Aware Scheduler) API provides a mechanism to specify the resource consumption of their topology. The API looks exactly like the base RAS API, only it is called on Trident Streams instead of Bolts and Spouts. + +In order to avoid duplication and inconsistency in documentation, the purpose and effects of resource setting are not described here, but are instead found in the [Resource Aware Scheduler Overview](Resource_Aware_Scheduler_overview.html) + +### Use + +First an example: --- End diff -- First, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Adding documentation for Trident RAS API
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1256#discussion_r57385884 --- Diff: docs/Trident-RAS-API.md --- @@ -0,0 +1,49 @@ +--- +title: Trident RAS API +layout: documentation +documentation: true +--- + +## Trident RAS API + +The Trident RAS (Resource Aware Scheduler) API provides a mechanism to specify the resource consumption of their topology. The API looks exactly like the base RAS API, only it is called on Trident Streams instead of Bolts and Spouts. --- End diff -- grammer: ... the resource consumption of **a Trident** topology ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57228512 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry entry : allAssignment.entrySet()) { +if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) { +newAssignment.put(entry.getKey(), entry.getValue()); + assignedStormIds.add(entry.getValue().get_topology_id()); +} +
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57182643 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +Map assignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + +Set keeperWorkerIds = new HashSet<>(); +Set keepPorts = new HashSet<&
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57181897 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +Map assignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + +Set keeperWorkerIds = new HashSet<>(); +Set keepPorts = new HashSet<&
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57171985 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry entry : allAssignment.entrySet()) { +if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) { +newAssignment.put(entry.getKey(), entry.getValue()); + assignedStormIds.add(entry.getValue().get_topology_id()); +} +
[GitHub] storm pull request: [STORM-1623] fix bug about nimbus.clj
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1211#issuecomment-197672345 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1636] - Supervisor shutdown with worker...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1226#issuecomment-197700574 Also needs to be pushed back into 1.x branch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1634] - Minor Refactoring of Resource A...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1224#issuecomment-197671679 Unrelated travis-ci failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197701084 Found another bug in supervisor: https://github.com/apache/storm/pull/1226 just another note here to make sure we remember to get in the fix whether in clojure or java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1636] - Supervisor shutdown with worker...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1226 [STORM-1636] - Supervisor shutdown with worker id pass in being nil In function kill-existing-workers-with-change-in-components in supervisor.clj: The function tries to detect whether there is a change in assignment. The bug in this function is that the ordering of the assignment matters but it shouldn't. For example, if a worker assignment is [[1 1] [2 2]] and it changed to [[2 2] [1 1]] it will cause the supervisor to restart the worker You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-1636 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1226.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1226 commit 580ed9d2c4b012701fb6c1a307afe86eaf32863e Author: Boyang Jerry Peng Date: 2016-03-17T04:45:42Z [STORM-1636] - Supervisor shutdown with worker id pass in being nil --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1634] - Minor Refactoring of Resource A...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1224 [STORM-1634] - Minor Refactoring of Resource Aware Scheduler You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm ras_refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1224.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1224 commit ac5a8fb695bb2a31e3907be5f0204357e5440d53 Author: Boyang Jerry Peng Date: 2016-03-16T20:27:41Z [STORM-1634] - Minor Refactoring of Resource Aware Scheduler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197042280 I have a bug fix for cgroups that involve a couple lines of modification in supervisor.clj. https://github.com/apache/storm/pull/1216 If that pull request gets merged first please remember to also implement that change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1631] - Storm CGroup bug when launching...
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1216 [STORM-1631] - Storm CGroup bug when launching workers as the user that submitted the topology You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm STORM-1631 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1216.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1216 commit f679dac1733e5c20fabfda34634d0470466eb539 Author: Boyang Jerry Peng Date: 2016-03-15T21:43:58Z [STORM-1631] - Storm CGroup bug when launching workers as the user that submitted the topology --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1616: Add RAS API for Trident
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/1199#issuecomment-196983397 As the original author for RAS, I am +1 for the current API. The most important thing for me is to maintain consistency between the API to set resource load in Trident and Storm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Fix minor bug in RAS Tests
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/1207 Fix minor bug in RAS Tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm jerrypeng-patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1207 commit 56ffe77b16f354dac18682e0bf2866347ac5cb6b Author: Boyang Jerry Peng Date: 2016-03-11T20:21:16Z Fix minor bug in RAS Tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1616: Add RAS API for Trident
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1199#discussion_r55869630 --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java --- @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.operation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.storm.topology.ResourceDeclarer; + +public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResource { + +private Map resources = new HashMap<>(); +private Map conf = Utils.readStormConfig(); + +@Override +public DefaultResourceDeclarer setMemoryLoad(Number onHeap) { +return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))); +} + +@Override +public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number offHeap) { +if (onHeap != null) { +onHeap = onHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); +} +if (offHeap!=null) { +offHeap = offHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); +} +return this; +} + +@Override +public DefaultResourceDeclarer setCPULoad(Number amount) { +if(amount != null) { +amount = amount.doubleValue(); +resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); +} +return this; +} + +@Override +public Map getResources() { --- End diff -- Sorry mis read the code. Thought the code returned an empty map --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1616: Add RAS API for Trident
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/1199#discussion_r55765793 --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java --- @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.operation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.storm.topology.ResourceDeclarer; + +public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResource { + +private Map resources = new HashMap<>(); +private Map conf = Utils.readStormConfig(); + +@Override +public DefaultResourceDeclarer setMemoryLoad(Number onHeap) { +return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))); +} + +@Override +public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number offHeap) { +if (onHeap != null) { +onHeap = onHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); +} +if (offHeap!=null) { +offHeap = offHeap.doubleValue(); + resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); +} +return this; +} + +@Override +public DefaultResourceDeclarer setCPULoad(Number amount) { +if(amount != null) { +amount = amount.doubleValue(); +resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); +} +return this; +} + +@Override +public Map getResources() { --- End diff -- Why do we need getResources()? thus why DefaultResourceDeclarer needs to implement ITridentResource? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---