This is an automated email from the ASF dual-hosted git repository. snemeth pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit cb91ab73b088ad68c5757cff3734d2667f5cb71c Author: Szilard Nemeth <snem...@apache.org> AuthorDate: Fri Jul 12 17:20:42 2019 +0200 YARN-9135. NM State store ResourceMappings serialization are tested with Strings instead of real Device objects. Contributed by Peter Bacsko (cherry picked from commit 8b3c6791b13fc57891cf81e83d4b626b4f2932e6) --- .../resources/numa/NumaResourceAllocation.java | 59 ++++++++++++++-------- .../resources/numa/NumaResourceAllocator.java | 34 ++++++++----- .../recovery/NMLeveldbStateStoreService.java | 5 +- .../recovery/TestNMLeveldbStateStoreService.java | 52 +++++++++++-------- 4 files changed, 91 insertions(+), 59 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java index f8d4739..e91ac3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa; +import com.google.common.collect.ImmutableMap; + import java.io.Serializable; -import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -28,27 +30,18 @@ import java.util.Set; */ public class NumaResourceAllocation implements Serializable { private static final long serialVersionUID = 6339719798446595123L; - private Map<String, Long> nodeVsMemory; - private Map<String, Integer> nodeVsCpus; + private final ImmutableMap<String, Long> nodeVsMemory; + private final ImmutableMap<String, Integer> nodeVsCpus; - public NumaResourceAllocation() { - nodeVsMemory = new HashMap<>(); - nodeVsCpus = new HashMap<>(); + public NumaResourceAllocation(Map<String, Long> memoryAllocations, + Map<String, Integer> cpuAllocations) { + nodeVsMemory = ImmutableMap.copyOf(memoryAllocations); + nodeVsCpus = ImmutableMap.copyOf(cpuAllocations); } public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId, int cpus) { - this(); - nodeVsMemory.put(memNodeId, memory); - nodeVsCpus.put(cpuNodeId, cpus); - } - - public void addMemoryNode(String memNodeId, long memory) { - nodeVsMemory.put(memNodeId, memory); - } - - public void addCpuNode(String cpuNodeId, int cpus) { - nodeVsCpus.put(cpuNodeId, cpus); + this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus)); } public Set<String> getMemNodes() { @@ -59,11 +52,37 @@ public class NumaResourceAllocation implements Serializable { return nodeVsCpus.keySet(); } - public Map<String, Long> getNodeVsMemory() { + public ImmutableMap<String, Long> getNodeVsMemory() { return nodeVsMemory; } - public Map<String, Integer> getNodeVsCpus() { + public ImmutableMap<String, Integer> getNodeVsCpus() { return nodeVsCpus; } -} + + @Override + public String toString() { + return "NumaResourceAllocation{" + + "nodeVsMemory=" + nodeVsMemory + + ", nodeVsCpus=" + nodeVsCpus + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NumaResourceAllocation that = (NumaResourceAllocation) o; + return Objects.equals(nodeVsMemory, that.nodeVsMemory) && + Objects.equals(nodeVsCpus, that.nodeVsCpus); + } + + @Override + public int hashCode() { + return Objects.hash(nodeVsMemory, nodeVsCpus); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java index e152bda..7b49b1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -247,17 +248,19 @@ public class NumaResourceAllocator { // If there is no single node matched for the container resource // Check the NUMA nodes for Memory resources - NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation(); - long memreq = resource.getMemorySize(); + long memoryRequirement = resource.getMemorySize(); + Map<String, Long> memoryAllocations = Maps.newHashMap(); for (NumaNodeResource numaNode : numaNodesList) { - long memrem = numaNode.assignAvailableMemory(memreq, containerId); - assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem); - memreq = memrem; - if (memreq == 0) { + long memoryRemaining = numaNode. + assignAvailableMemory(memoryRequirement, containerId); + memoryAllocations.put(numaNode.getNodeId(), + memoryRequirement - memoryRemaining); + memoryRequirement = memoryRemaining; + if (memoryRequirement == 0) { break; } } - if (memreq != 0) { + if (memoryRequirement != 0) { LOG.info("There is no available memory:" + resource.getMemorySize() + " in numa nodes for " + containerId); releaseNumaResource(containerId); @@ -265,26 +268,31 @@ public class NumaResourceAllocator { } // Check the NUMA nodes for CPU resources - int cpusreq = resource.getVirtualCores(); + int cpusRequirement = resource.getVirtualCores(); + Map<String, Integer> cpuAllocations = Maps.newHashMap(); for (int index = 0; index < numaNodesList.size(); index++) { NumaNodeResource numaNode = numaNodesList .get((currentAssignNode + index) % numaNodesList.size()); - int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId); - assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem); - cpusreq = cpusrem; - if (cpusreq == 0) { + int cpusRemaining = numaNode. + assignAvailableCpus(cpusRequirement, containerId); + cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining); + cpusRequirement = cpusRemaining; + if (cpusRequirement == 0) { currentAssignNode = (currentAssignNode + index + 1) % numaNodesList.size(); break; } } - if (cpusreq != 0) { + if (cpusRequirement != 0) { LOG.info("There are no available cpus:" + resource.getVirtualCores() + " in numa nodes for " + containerId); releaseNumaResource(containerId); return null; } + + NumaResourceAllocation assignedNumaNodeInfo = + new NumaResourceAllocation(memoryAllocations, cpuAllocations); LOG.info("Assigning multiple NUMA nodes (" + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes()) + ") for memory, (" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 1d7771a..8de94a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -1459,8 +1459,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString() + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { ResourceMappings.AssignedResources res = new ResourceMappings.AssignedResources(); res.updateAssignedResources(assignedResources); @@ -1468,8 +1467,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { // New value will overwrite old values for the same key batch.put(bytes(keyResChng), res.toBytes()); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { markStoreUnHealthy(e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 87208f7..c4c194c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; @@ -1448,7 +1451,7 @@ public class TestNMLeveldbStateStoreService { @Test public void testStateStoreForResourceMapping() throws IOException { - // test empty when no state + // test that stateStore is initially empty List<RecoveredContainerState> recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); @@ -1464,38 +1467,43 @@ public class TestNMLeveldbStateStoreService { ResourceMappings resourceMappings = new ResourceMappings(); when(container.getResourceMappings()).thenReturn(resourceMappings); - // Store ResourceMapping stateStore.storeAssignedResources(container, "gpu", - Arrays.asList("1", "2", "3")); - // This will overwrite above - List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4"); + Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2), + new GpuDevice(3, 3))); + + // This will overwrite the above + List<Serializable> gpuRes1 = Arrays.asList( + new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4)); stateStore.storeAssignedResources(container, "gpu", gpuRes1); - List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6"); + + List<Serializable> fpgaRes = Arrays.asList( + new FpgaDevice("testType", 3, 3, "testIPID"), + new FpgaDevice("testType", 4, 4, "testIPID"), + new FpgaDevice("testType", 5, 5, "testIPID"), + new FpgaDevice("testType", 6, 6, "testIPID")); stateStore.storeAssignedResources(container, "fpga", fpgaRes); - List<Serializable> numaRes = Arrays.asList("numa1"); + + List<Serializable> numaRes = Arrays.asList( + new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10)); stateStore.storeAssignedResources(container, "numa", numaRes); - // add a invalid key restartStateStore(); recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); - List<Serializable> res = rcs.getResourceMappings() + List<Serializable> resources = rcs.getResourceMappings() .getAssignedResources("gpu"); - Assert.assertTrue(res.equals(gpuRes1)); - Assert.assertTrue( - resourceMappings.getAssignedResources("gpu").equals(gpuRes1)); - - res = rcs.getResourceMappings().getAssignedResources("fpga"); - Assert.assertTrue(res.equals(fpgaRes)); - Assert.assertTrue( - resourceMappings.getAssignedResources("fpga").equals(fpgaRes)); - - res = rcs.getResourceMappings().getAssignedResources("numa"); - Assert.assertTrue(res.equals(numaRes)); - Assert.assertTrue( - resourceMappings.getAssignedResources("numa").equals(numaRes)); + Assert.assertEquals(gpuRes1, resources); + Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu")); + + resources = rcs.getResourceMappings().getAssignedResources("fpga"); + Assert.assertEquals(fpgaRes, resources); + Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga")); + + resources = rcs.getResourceMappings().getAssignedResources("numa"); + Assert.assertEquals(numaRes, resources); + Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa")); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org