This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new c480eac01 [apache/helix] -- Issue during onboarding resources without instances (#2782) c480eac01 is described below commit c480eac018932214886aaf55f3c28107d46e84b0 Author: Himanshu Kandwal <himanshuk...@gmail.com> AuthorDate: Mon Apr 1 11:06:40 2024 -0700 [apache/helix] -- Issue during onboarding resources without instances (#2782) When adding a new WAGED resource with a tag and without any instances against that tag, we are observing NPE coming from the system. To solve this issue we are adding a check in the ResourceComputationStage to have such resources excluded from the pipeline computation and only be considered when there are actual resource partitions (>0) to be assigned to the instances. --- .../stages/ResourceComputationStage.java | 2 +- ...xpansionWithAddingResourcesBeforeInstances.java | 189 +++++++++++++++++++++ 2 files changed, 190 insertions(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 00b2fd71b..e1d9a9215 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -91,7 +91,7 @@ public class ResourceComputationStage extends AbstractBaseStage { Map<String, IdealState> idealStates, boolean isTaskCache) { if (idealStates != null && idealStates.size() > 0) { for (IdealState idealState : idealStates.values()) { - if (idealState == null) { + if (idealState == null || idealState.getNumPartitions() == 0) { continue; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansionWithAddingResourcesBeforeInstances.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansionWithAddingResourcesBeforeInstances.java new file mode 100644 index 000000000..de7aff693 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansionWithAddingResourcesBeforeInstances.java @@ -0,0 +1,189 @@ +package org.apache.helix.integration.rebalancer.WagedRebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.helix.model.BuiltInStateModelDefinitions.LeaderStandby; +import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY; + +public class TestWagedClusterExpansionWithAddingResourcesBeforeInstances extends ZkTestBase { + private static final long TIMEOUT = 10 * 1000L; + protected static final AtomicLong PORT_GENERATOR = new AtomicLong(12918); + protected static final int PARTITIONS = 4; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + protected HelixClusterVerifier _clusterVerifier; + + List<MockParticipantManager> _participants = new ArrayList<>(); + Set<String> _allDBs = new HashSet<>(); + int _replica = 3; + + @BeforeClass + public void setupCluster() throws Exception { + _gSetupTool.addCluster(CLUSTER_NAME, true); + + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology("/zone/instance"); + clusterConfig.setFaultZoneType("zone"); + clusterConfig.setDelayRebalaceEnabled(true); + // Set a long enough time to ensure delayed rebalance is activate + clusterConfig.setRebalanceDelayTime(3000000); + + Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>(); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0); + preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10); + clusterConfig.setGlobalRebalancePreference(preference); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + // create resource with instances + String testResource1 = "Test-resource-1"; + createResource(testResource1, 4, 4, "Tag-1", true); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); + _gSetupTool.rebalanceResource(CLUSTER_NAME, testResource1, _replica); + _allDBs.add(testResource1); + + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).setResources(_allDBs) + .build(); + Assert.assertTrue(_clusterVerifier.verify(12000)); + } + + private List<String> createResource( + String resourceName, int numInstances, int numPartitions, String tagName, boolean enableParticipants) { + List<String> nodes = new ArrayList<>(); + for (int i = 0; i < numInstances; i++) { + nodes.add(addInstance(new ConfigAccessor(_gZkClient), "zone-" + i % numInstances, tagName, enableParticipants)); + } + + createResourceWithWagedRebalance(CLUSTER_NAME, resourceName, LeaderStandby.name(), numPartitions, _replica, _replica - 1); + IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, resourceName); + idealState.setInstanceGroupTag(tagName); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, resourceName, idealState); + return nodes; + } + + private String addInstance(ConfigAccessor configAccessor, String zone, String instanceTag, boolean enabled) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + PORT_GENERATOR.incrementAndGet(); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _gSetupTool.addInstanceTag(CLUSTER_NAME, storageNodeName, instanceTag); + String domain = String.format("zone=%s,instance=%s", zone, storageNodeName); + + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); + instanceConfig.setDomain(domain); + instanceConfig.setInstanceEnabled(enabled); + _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig); + + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + if (enabled) { + // start dummy participant + participant.syncStart(); + } + _participants.add(participant); + + return storageNodeName; + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (MockParticipantManager p : _participants) { + p.syncStop(); + } + deleteCluster(CLUSTER_NAME); + } + + @Test + public void testExpandClusterWithResourceWithoutInstances() throws Exception { + // Set-up a WAGED resource without any instances and let cluster rebalance successfully. + String testResource2 = "Test-resource-2"; + String testResourceTagName = "Tag-2"; + createResource(testResource2, 0, 0, testResourceTagName, false); + + _gSetupTool.rebalanceResource(CLUSTER_NAME, testResource2, _replica); + _allDBs.add(testResource2); + + ZkHelixClusterVerifier _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME) + .setZkClient(_gZkClient) + .setResources(_allDBs) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + checkRebalanceFailureGauge(false); + } + + private void checkRebalanceFailureGauge(final boolean expectFailure) throws Exception { + boolean result = TestHelper.verify(() -> { + try { + Long value = + (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge"); + return value != null && (value == 1) == expectFailure; + } catch (Exception e) { + return false; + } + }, TIMEOUT); + Assert.assertTrue(result); + } + + private ObjectName getMbeanName(String clusterName) throws MalformedObjectNameException { + String clusterBeanName = String.format("%s=%s", CLUSTER_DN_KEY, clusterName); + return new ObjectName( + String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName)); + } + +}