This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f87d94859 Improve setInstanceOperation performance (#3017)
f87d94859 is described below
commit f87d948598b33de0788653c2438307bf089dece6
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Apr 15 09:37:40 2025 -0700
Improve setInstanceOperation performance (#3017)
This change will switch to parallel/async get on all instance configs,
using HelixDataAccessor, and avoid calling findInstancesWithMatchingLogicalId
for instance operation transitions where this check is not required.
---
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 9 +-
.../java/org/apache/helix/util/InstanceUtil.java | 179 ++++++++++++++++-----
2 files changed, 142 insertions(+), 46 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index ae914682b..0d72ac4aa 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -211,7 +211,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
List<InstanceConfig> matchingLogicalIdInstances =
- InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor,
clusterName,
+ InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor,
clusterName,
instanceConfig);
if (matchingLogicalIdInstances.size() > 1) {
throw new HelixException(
@@ -224,7 +224,8 @@ public class ZKHelixAdmin implements HelixAdmin {
InstanceConstants.InstanceOperation attemptedInstanceOperation =
instanceConfig.getInstanceOperation().getOperation();
try {
- InstanceUtil.validateInstanceOperationTransition(_configAccessor,
clusterName, instanceConfig,
+ InstanceUtil.validateInstanceOperationTransition(_baseDataAccessor,
clusterName,
+ instanceConfig,
InstanceConstants.InstanceOperation.UNKNOWN,
attemptedInstanceOperation);
} catch (HelixException e) {
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
@@ -616,7 +617,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
List<InstanceConfig> swappingInstances =
- InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor,
clusterName,
+ InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor,
clusterName,
instanceConfig);
if (swappingInstances.size() != 1) {
logger.warn(
@@ -655,7 +656,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
List<InstanceConfig> swappingInstances =
- InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor,
clusterName,
+ InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor,
clusterName,
instanceConfig);
if (swappingInstances.size() != 1) {
logger.warn(
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
index 967d561e7..e632f0857 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
@@ -20,23 +20,25 @@ package org.apache.helix.util;
*/
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.zkclient.DataUpdater;
public class InstanceUtil {
@@ -45,27 +47,43 @@ public class InstanceUtil {
}
// Validators for instance operation transitions
- private static final Function<List<InstanceConfig>, Boolean> ALWAYS_ALLOWED =
- (matchingInstances) -> true;
- private static final Function<List<InstanceConfig>, Boolean>
ALL_MATCHES_ARE_UNKNOWN =
- (matchingInstances) -> matchingInstances.isEmpty() ||
matchingInstances.stream().allMatch(
- instance -> instance.getInstanceOperation().getOperation()
- .equals(InstanceConstants.InstanceOperation.UNKNOWN));
- private static final Function<List<InstanceConfig>, Boolean>
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
- (matchingInstances) -> matchingInstances.isEmpty() ||
matchingInstances.stream().allMatch(
- instance -> instance.getInstanceOperation().getOperation()
- .equals(InstanceConstants.InstanceOperation.UNKNOWN)
- || instance.getInstanceOperation().getOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE));
- private static final Function<List<InstanceConfig>, Boolean>
ANY_MATCH_ENABLE_OR_DISABLE =
- (matchingInstances) -> !matchingInstances.isEmpty() &&
matchingInstances.stream().anyMatch(
- instance -> instance.getInstanceOperation().getOperation()
- .equals(InstanceConstants.InstanceOperation.ENABLE) ||
instance.getInstanceOperation()
-
.getOperation().equals(InstanceConstants.InstanceOperation.DISABLE));
+ private static final InstanceOperationValidator ALWAYS_ALLOWED =
+ (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> true;
+ private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN =
+ (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
+ List<InstanceConfig> matchingInstances =
+ findInstancesWithMatchingLogicalId(baseDataAccessor,
configAccessor, clusterName,
+ instanceConfig);
+ return matchingInstances.isEmpty() ||
matchingInstances.stream().allMatch(
+ instance -> instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN));
+ };
+ private static final InstanceOperationValidator
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
+ (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
+ List<InstanceConfig> matchingInstances =
+ findInstancesWithMatchingLogicalId(baseDataAccessor,
configAccessor, clusterName,
+ instanceConfig);
+ return matchingInstances.isEmpty() ||
matchingInstances.stream().allMatch(instance ->
+ instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN)
+ || instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.EVACUATE));
+ };
+ private static final InstanceOperationValidator ANY_MATCH_ENABLE_OR_DISABLE =
+ (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
+ List<InstanceConfig> matchingInstances =
+ findInstancesWithMatchingLogicalId(baseDataAccessor,
configAccessor, clusterName,
+ instanceConfig);
+ return !matchingInstances.isEmpty() &&
matchingInstances.stream().anyMatch(instance ->
+ instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.ENABLE)
+ || instance.getInstanceOperation().getOperation()
+ .equals(InstanceConstants.InstanceOperation.DISABLE));
+ };
// Validator map for valid instance operation transitions
<currentOperation>:<targetOperation>:<validator>
- private static final ImmutableMap<InstanceConstants.InstanceOperation,
ImmutableMap<InstanceConstants.InstanceOperation,
Function<List<InstanceConfig>, Boolean>>>
- validInstanceOperationTransitions =
+ private static final ImmutableMap<InstanceConstants.InstanceOperation,
ImmutableMap<InstanceConstants.InstanceOperation, InstanceOperationValidator>>
+ VALID_INSTANCE_OPERATION_TRANSITIONS =
ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
// ENABLE and DISABLE can be set to UNKNOWN when matching instance is in
SWAP_IN and set to ENABLE in a transaction.
ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
ALWAYS_ALLOWED,
@@ -100,22 +118,55 @@ public class InstanceUtil {
* @param instanceConfig The current instance configuration
* @param currentOperation The current operation
* @param targetOperation The target operation
+ * @deprecated Use {@link
#validateInstanceOperationTransition(BaseDataAccessor, String, InstanceConfig,
InstanceConstants.InstanceOperation, InstanceConstants.InstanceOperation)}
+ * instead for better performance.
*/
+ @Deprecated
public static void validateInstanceOperationTransition(ConfigAccessor
configAccessor,
String clusterName, InstanceConfig instanceConfig,
InstanceConstants.InstanceOperation currentOperation,
InstanceConstants.InstanceOperation targetOperation) {
- // Check if the current operation and target operation are in the valid
transitions map
- if (!validInstanceOperationTransitions.containsKey(currentOperation)
- ||
!validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation))
{
+
+ validateInstanceOperationTransition(null, configAccessor, clusterName,
instanceConfig,
+ currentOperation, targetOperation);
+ }
+
+ /**
+ * Validates if the transition from the current operation to the target
operation is valid.
+ *
+ * @param baseDataAccessor The BaseDataAccessor instance
+ * @param clusterName The cluster name
+ * @param instanceConfig The current instance configuration
+ * @param currentOperation The current operation
+ * @param targetOperation The target operation
+ */
+ public static void validateInstanceOperationTransition(
+ BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName,
+ InstanceConfig instanceConfig,
+ InstanceConstants.InstanceOperation currentOperation,
+ InstanceConstants.InstanceOperation targetOperation) {
+
+ validateInstanceOperationTransition(baseDataAccessor, null, clusterName,
instanceConfig,
+ currentOperation, targetOperation);
+ }
+
+ private static void validateInstanceOperationTransition(
+ @Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
+ @Nullable ConfigAccessor configAccessor, String clusterName,
InstanceConfig instanceConfig,
+ InstanceConstants.InstanceOperation currentOperation,
+ InstanceConstants.InstanceOperation targetOperation) {
+ ImmutableMap<InstanceConstants.InstanceOperation,
InstanceOperationValidator> transitionMap =
+ VALID_INSTANCE_OPERATION_TRANSITIONS.get(currentOperation);
+
+ if (transitionMap == null || !transitionMap.containsKey(targetOperation)) {
throw new HelixException(
"Invalid instance operation transition from " + currentOperation + "
to "
+ targetOperation);
}
- // Throw exception if the validation fails
- if
(!validInstanceOperationTransitions.get(currentOperation).get(targetOperation)
- .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName,
instanceConfig))) {
+ InstanceOperationValidator validator = transitionMap.get(targetOperation);
+ if (validator == null || !validator.validate(baseDataAccessor,
configAccessor, clusterName,
+ instanceConfig)) {
throw new HelixException(
"Failed validation for instance operation transition from " +
currentOperation + " to "
+ targetOperation);
@@ -130,6 +181,7 @@ public class InstanceUtil {
* @param instanceConfig The instance configuration to match
* @return A list of matching instances
*/
+ @Deprecated
public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
ConfigAccessor configAccessor, String clusterName, InstanceConfig
instanceConfig) {
String logicalIdKey =
@@ -148,17 +200,57 @@ public class InstanceUtil {
.collect(Collectors.toList());
}
+ /**
+ * Finds the instances that have a matching logical ID with the given
instance.
+ *
+ * @param clusterName The cluster name
+ * @param instanceConfig The instance configuration to match
+ * @return A list of matching instances
+ */
+ public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
+ BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName,
+ InstanceConfig instanceConfig) {
+ HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName,
baseDataAccessor);
+
+ ClusterConfig clusterConfig =
+
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig());
+ String logicalIdKey =
+
ClusterTopologyConfig.createFromClusterConfig(clusterConfig).getEndNodeType();
+
+ List<InstanceConfig> instanceConfigs =
+
helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(),
true);
+
+ // Retrieve and filter instances with matching logical ID
+ return instanceConfigs.stream().filter(potentialInstanceConfig ->
+
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
+ && potentialInstanceConfig.getLogicalId(logicalIdKey)
+
.equals(instanceConfig.getLogicalId(logicalIdKey))).collect(Collectors.toList());
+ }
+
+ private static List<InstanceConfig> findInstancesWithMatchingLogicalId(
+ @Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
+ @Nullable ConfigAccessor configAccessor, String clusterName,
InstanceConfig instanceConfig) {
+ if (baseDataAccessor == null && configAccessor == null) {
+ throw new HelixException(
+ "Both BaseDataAccessor and ConfigAccessor cannot be null at the same
time");
+ }
+
+ return baseDataAccessor != null ?
findInstancesWithMatchingLogicalId(baseDataAccessor,
+ clusterName, instanceConfig)
+ : findInstancesWithMatchingLogicalId(configAccessor, clusterName,
instanceConfig);
+ }
+
/**
* Sets the instance operation for the given instance.
*
* @param configAccessor The ConfigAccessor instance
- * @param baseAccessor The BaseDataAccessor instance
+ * @param baseDataAccessor The BaseDataAccessor instance
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation to set
*/
public static void setInstanceOperation(ConfigAccessor configAccessor,
- BaseDataAccessor<ZNRecord> baseAccessor, String clusterName, String
instanceName,
+ BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName, String
instanceName,
InstanceConfig.InstanceOperation instanceOperation) {
String path = PropertyPathBuilder.instanceConfig(clusterName,
instanceName);
@@ -170,24 +262,22 @@ public class InstanceUtil {
}
// Validate the instance operation transition
- validateInstanceOperationTransition(configAccessor, clusterName,
instanceConfig,
+ validateInstanceOperationTransition(baseDataAccessor, configAccessor,
clusterName,
+ instanceConfig,
instanceConfig.getInstanceOperation().getOperation(),
instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE
: instanceOperation.getOperation());
// Update the instance operation
- boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- if (currentData == null) {
- throw new HelixException("Cluster: " + clusterName + ", instance: "
+ instanceName
- + ", participant config is null");
- }
-
- InstanceConfig config = new InstanceConfig(currentData);
- config.setInstanceOperation(instanceOperation);
- return config.getRecord();
+ boolean succeeded = baseDataAccessor.update(path, currentData -> {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", instance: " +
instanceName
+ + ", participant config is null");
}
+
+ InstanceConfig config = new InstanceConfig(currentData);
+ config.setInstanceOperation(instanceOperation);
+ return config.getRecord();
}, AccessOption.PERSISTENT);
if (!succeeded) {
@@ -195,4 +285,9 @@ public class InstanceUtil {
"Failed to update instance operation. Please check if instance is
disabled.");
}
}
+
+ private interface InstanceOperationValidator {
+ boolean validate(@Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
+ @Nullable ConfigAccessor configAccessor, String clusterName,
InstanceConfig instanceConfig);
+ }
}