This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit db94b22c3b92b85f150680225b230c9df3882ba5 Author: Neal Sun <[email protected]> AuthorDate: Mon Dec 14 14:00:56 2020 -0800 Task Current State Migration: helix-rest, utils, tests changes (#1579) The first part of the task current state migration. All changes made in this commit are on the controller side and are non-pipeline related. --- .../tools/commandtools/CurrentStateCleanUp.java | 24 +++++++++++++++------- .../src/test/java/org/apache/helix/TestHelper.java | 7 +++++++ .../helix/controller/stages/TestTaskStage.java | 3 ++- .../messaging/handling/MockHelixTaskExecutor.java | 5 +++++ .../resources/helix/PerInstanceAccessor.java | 8 +++++++- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java index c657490..31c3cc7 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java +++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java @@ -19,6 +19,7 @@ package org.apache.helix.tools.commandtools; * under the License. */ +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -124,18 +125,27 @@ public class CurrentStateCleanUp { LOG.info(String.format("Processing cleaning current state for instance: %s", instanceName)); List<String> currentStateNames = accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, session)); - for (String currentStateName : currentStateNames) { - PropertyKey key = - accessor.keyBuilder().currentState(instanceName, session, currentStateName); + List<String> taskCurrentStateNames = + accessor.getChildNames(accessor.keyBuilder().taskCurrentStates(instanceName, session)); + List<PropertyKey> allCurrentStateKeys = new ArrayList<>(); + currentStateNames.stream() + .map(name -> accessor.keyBuilder().currentState(instanceName, session, name)) + .forEach(allCurrentStateKeys::add); + taskCurrentStateNames.stream() + .map(name -> accessor.keyBuilder().taskCurrentState(instanceName, session, name)) + .forEach(allCurrentStateKeys::add); + + List<String> pathsToRemove = new ArrayList<>(); + for (PropertyKey key : allCurrentStateKeys) { accessor.getBaseDataAccessor().update(key.getPath(), updater, AccessOption.PERSISTENT); CurrentState currentState = accessor.getProperty(key); if (currentState.getPartitionStateMap().size() == 0) { - accessor.getBaseDataAccessor().remove(key.getPath(), AccessOption.PERSISTENT); - LOG.info(String.format("Remove current state for instance: %s, resource %s", instanceName, - currentStateName)); + pathsToRemove.add(key.getPath()); + LOG.info(String.format("Remove current state for path %s", key.getPath())); } - } + + accessor.getBaseDataAccessor().remove(pathsToRemove, AccessOption.PERSISTENT); } catch (Exception e) { e.printStackTrace(); } finally { diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java index c805158..79f238d 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java @@ -240,6 +240,13 @@ public class TestHelper { if (curState != null && curState.getRecord().getMapFields().size() != 0) { return false; } + + CurrentState taskCurState = + accessor.getProperty(keyBuilder.taskCurrentState(instanceName, sessionId, resourceName)); + + if (taskCurState != null && taskCurState.getRecord().getMapFields().size() != 0) { + return false; + } } ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName)); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java index 0810e09..b0d8da9 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java @@ -48,7 +48,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestTaskStage extends TaskTestBase { - private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange); + private ClusterEvent _event = + new ClusterEvent(CLUSTER_NAME, ClusterEventType.TaskCurrentStateChange); private PropertyKey.Builder _keyBuilder; private String _testWorkflow = TestHelper.getTestClassName(); private String _testJobPrefix = _testWorkflow + "_Job_"; diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java index 0b43a90..aa36b4f 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java @@ -57,6 +57,11 @@ public class MockHelixTaskExecutor extends HelixTaskExecutor { PropertyKey.Builder keyBuilder = accessor.keyBuilder(); PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId()); Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path, true); + // Also add the task path + PropertyKey taskPath = + keyBuilder.taskCurrentStates(manager.getInstanceName(), manager.getSessionId()); + Map<String, CurrentState> taskCurrentStateMap = accessor.getChildValuesMap(taskPath, true); + taskCurrentStateMap.forEach(currentStateMap::putIfAbsent); Set<String> seenPartitions = new HashSet<>(); for (Message message : messages) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index 8785796..1560d74 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -386,7 +386,9 @@ public class PerInstanceAccessor extends AbstractHelixResource { List<String> resources = accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId)); - if (resources != null && resources.size() > 0) { + resources.addAll(accessor + .getChildNames(accessor.keyBuilder().taskCurrentStates(instanceName, currentSessionId))); + if (resources.size() > 0) { resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources)); } @@ -409,6 +411,10 @@ public class PerInstanceAccessor extends AbstractHelixResource { String currentSessionId = sessionIds.get(0); CurrentState resourceCurrentState = accessor.getProperty( accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName)); + if (resourceCurrentState == null) { + resourceCurrentState = accessor.getProperty( + accessor.keyBuilder().taskCurrentState(instanceName, currentSessionId, resourceName)); + } if (resourceCurrentState != null) { return JSONRepresentation(resourceCurrentState.getRecord()); }
