[HELIX-788] HELIX: Fix DefaultPipeline so that it doesn't rebalance task resources
Helix CHO testing indicated that the default pipeline was rebalancing task framework resources. This RB fixes this. Changelist: 1. Change resourceMap to resourceToRebalance, which separates generic and task resources 2. Make logger use LogUtil to distinguish two pipelines Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/59536d39 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/59536d39 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/59536d39 Branch: refs/heads/master Commit: 59536d39c85d3535408a40a46a1a60a4105ee6e4 Parents: dc25bac Author: narendly <[email protected]> Authored: Fri Nov 2 14:19:16 2018 -0700 Committer: narendly <[email protected]> Committed: Fri Nov 2 14:19:16 2018 -0700 ---------------------------------------------------------------------- .../stages/IntermediateStateCalcStage.java | 29 ++++++++++---------- .../stages/ResourceComputationStage.java | 2 +- .../stages/TestStateTransitionPrirority.java | 3 +- 3 files changed, 18 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 915a90f..9768bf7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -64,18 +64,19 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { BestPossibleStateOutput bestPossibleStateOutput = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); - Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); + Map<String, Resource> resourceToRebalance = + event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); - if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null + if (currentStateOutput == null || bestPossibleStateOutput == null || resourceToRebalance == null || cache == null) { throw new StageException(String.format("Missing attributes in event: %s. " - + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)", - event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache)); + + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)", + event, currentStateOutput, bestPossibleStateOutput, resourceToRebalance, cache)); } IntermediateStateOutput intermediateStateOutput = - compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput); + compute(event, resourceToRebalance, currentStateOutput, bestPossibleStateOutput); event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput); // Make sure no instance has more replicas/partitions assigned than maxPartitionPerInstance. If @@ -146,9 +147,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { String resourceName = resourcePriority.getResourceName(); if (!bestPossibleStateOutput.containsResource(resourceName)) { - logger.warn( - "Skip calculating intermediate state for resource {} because the best possible state is not available.", - resourceName); + LogUtil.logInfo(logger, _eventId, String.format( + "Skip calculating intermediate state for resource %s because the best possible state is not available.", + resourceName)); continue; } @@ -228,8 +229,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { instancePartitionCounts.put(instance, 0); } int partitionCount = instancePartitionCounts.get(instance); // Number of replicas (from - // different partitions) held - // in this instance + // different partitions) held + // in this instance partitionCount++; if (partitionCount > maxPartitionPerInstance) { HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); @@ -376,7 +377,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { // ErrorOrRecovery is set threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance(); partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is - // set + // set } else { if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) { // 0 is the default value so the old threshold has been set @@ -736,8 +737,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { return RebalanceType.NONE; // No further action required } else { return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to - // achieve BestPossibleState, load balance may be required - // to shift replicas around + // achieve BestPossibleState, load balance may be required + // to shift replicas around } } @@ -904,4 +905,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { return matchedState; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 02a175a..15bae65 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -54,7 +54,7 @@ public class ResourceComputationStage extends AbstractBaseStage { Map<String, IdealState> idealStates = cache.getIdealStates(); - Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>(); + Map<String, Resource> resourceMap = new LinkedHashMap<>(); Map<String, Resource> resourceToRebalance = new LinkedHashMap<>(); if (idealStates != null && idealStates.size() > 0) { http://git-wip-us.apache.org/repos/asf/helix/blob/59536d39/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java index ef5139e..d4a6aaf 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPrirority.java @@ -176,7 +176,8 @@ public class TestStateTransitionPrirority extends BaseStageTest { event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(resourceName, resource)); - event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resource); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), + Collections.singletonMap(resourceName, resource)); event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); runStage(event, new ReadClusterDataStage());
