YARN-6507. Add support in NodeManager to isolate FPGA devices with CGroups. (Zhankun Tang via wangda)
Change-Id: Ic9afd841805f1035423915a0b0add5f3ba96cf9d Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7225ec0c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7225ec0c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7225ec0c Branch: refs/heads/HDFS-7240 Commit: 7225ec0ceb49ae8f5588484297a20f07ec047420 Parents: 5304698 Author: Wangda Tan <[email protected]> Authored: Fri Dec 1 10:50:49 2017 -0800 Committer: Wangda Tan <[email protected]> Committed: Fri Dec 1 10:50:49 2017 -0800 ---------------------------------------------------------------------- .../yarn/api/records/ResourceInformation.java | 5 +- .../hadoop/yarn/conf/YarnConfiguration.java | 25 +- .../src/main/resources/yarn-default.xml | 42 +- .../linux/privileged/PrivilegedOperation.java | 1 + .../resources/fpga/FpgaResourceAllocator.java | 413 +++++++++++++++++ .../resources/fpga/FpgaResourceHandlerImpl.java | 220 +++++++++ .../resourceplugin/ResourcePluginManager.java | 8 +- .../fpga/AbstractFpgaVendorPlugin.java | 90 ++++ .../resourceplugin/fpga/FpgaDiscoverer.java | 139 ++++++ .../fpga/FpgaNodeResourceUpdateHandler.java | 71 +++ .../resourceplugin/fpga/FpgaResourcePlugin.java | 105 +++++ .../fpga/IntelFpgaOpenclPlugin.java | 396 ++++++++++++++++ .../resources/fpga/TestFpgaResourceHandler.java | 458 +++++++++++++++++++ .../resourceplugin/fpga/TestFpgaDiscoverer.java | 187 ++++++++ 14 files changed, 2155 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index 67592cc..a8198d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -42,6 +42,7 @@ public class ResourceInformation implements Comparable<ResourceInformation> { public static final String MEMORY_URI = "memory-mb"; public static final String VCORES_URI = "vcores"; public static final String GPU_URI = "yarn.io/gpu"; + public static final String FPGA_URI = "yarn.io/fpga"; public static final ResourceInformation MEMORY_MB = ResourceInformation.newInstance(MEMORY_URI, "Mi"); @@ -49,9 +50,11 @@ public class ResourceInformation implements Comparable<ResourceInformation> { ResourceInformation.newInstance(VCORES_URI); public static final ResourceInformation GPUS = ResourceInformation.newInstance(GPU_URI); + public static final ResourceInformation FPGAS = + ResourceInformation.newInstance(FPGA_URI); public static final Map<String, ResourceInformation> MANDATORY_RESOURCES = - ImmutableMap.of(MEMORY_URI, MEMORY_MB, VCORES_URI, VCORES, GPU_URI, GPUS); + ImmutableMap.of(MEMORY_URI, MEMORY_MB, VCORES_URI, VCORES, GPU_URI, GPUS, FPGA_URI, FPGAS); /** * Get the name for the resource. http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c1024ea..831abf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1514,13 +1514,36 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT = "http://localhost:3476/v1.0/docker/cli"; + /** + * Prefix for FPGA configurations. Work in progress: This configuration + * parameter may be changed/removed in the future. + */ + @Private + public static final String NM_FPGA_RESOURCE_PREFIX = + NM_RESOURCE_PLUGINS + ".fpga."; + + @Private + public static final String NM_FPGA_ALLOWED_DEVICES = + NM_FPGA_RESOURCE_PREFIX + "allowed-fpga-devices"; + + @Private + public static final String NM_FPGA_PATH_TO_EXEC = + NM_FPGA_RESOURCE_PREFIX + "path-to-discovery-executables"; + + @Private + public static final String NM_FPGA_VENDOR_PLUGIN = + NM_FPGA_RESOURCE_PREFIX + "vendor-plugin.class"; + + @Private + public static final String DEFAULT_NM_FPGA_VENDOR_PLUGIN = + "org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin"; /** NM Webapp address.**/ public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final int DEFAULT_NM_WEBAPP_PORT = 8042; public static final String DEFAULT_NM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_NM_WEBAPP_PORT; - + /** NM Webapp https address.**/ public static final String NM_WEBAPP_HTTPS_ADDRESS = NM_PREFIX + "webapp.https.address"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index dd9c6bd..2550c42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3512,7 +3512,8 @@ <property> <description> Enable additional discovery/isolation of resources on the NodeManager, - split by comma. By default, this is empty. Acceptable values: { "yarn-io/gpu" }. + split by comma. By default, this is empty. + Acceptable values: { "yarn-io/gpu", "yarn-io/fpga"}. </description> <name>yarn.nodemanager.resource-plugins</name> <value></value> @@ -3559,6 +3560,43 @@ <value>http://localhost:3476/v1.0/docker/cli</value> </property> ->>>>>>> theirs + <property> + <description> + Specify one vendor plugin to handle FPGA devices discovery/IP download/configure. + Only IntelFpgaOpenclPlugin is supported by default. + We only allow one NM configured with one vendor FPGA plugin now since the end user can put the same + vendor's cards in one host. And this also simplify our design. + </description> + <name>yarn.nodemanager.resource-plugins.fpga.vendor-plugin.class</name> + <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin</value> + </property> + + <property> + <description> + When yarn.nodemanager.resource.fpga.allowed-fpga-devices=auto specified, + YARN NodeManager needs to run FPGA discovery binary (now only support + IntelFpgaOpenclPlugin) to get FPGA information. + When value is empty (default), YARN NodeManager will try to locate + discovery executable from vendor plugin's preference + </description> + <name>yarn.nodemanager.resource-plugins.fpga.path-to-discovery-executables</name> + <value></value> + </property> + + <property> + <description> + Specify FPGA devices which can be managed by YARN NodeManager, split by comma + Number of FPGA devices will be reported to RM to make scheduling decisions. + Set to auto (default) let YARN automatically discover FPGA resource from + system. + + Manually specify FPGA devices if admin only want subset of FPGA devices managed by YARN. + At present, since we can only configure one major number in c-e.cfg, FPGA device is + identified by their minor device number. A common approach to get minor + device number of FPGA is using "aocl diagnose" and check uevent with device name. + </description> + <name>yarn.nodemanager.resource-plugins.fpga.allowed-fpga-devices</name> + <value>0,1</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java index db0b225..ad8c22f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java @@ -52,6 +52,7 @@ public class PrivilegedOperation { ADD_PID_TO_CGROUP(""), //no CLI switch supported yet. RUN_DOCKER_CMD("--run-docker"), GPU("--module-gpu"), + FPGA("--module-fpga"), LIST_AS_USER(""); //no CLI switch supported yet. private final String option; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java new file mode 100644 index 0000000..62dd3c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga; + + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; + + +/** + * This FPGA resource allocator tends to be used by different FPGA vendor's plugin + * A "type" parameter is taken into consideration when allocation + * */ +public class FpgaResourceAllocator { + + static final Log LOG = LogFactory.getLog(FpgaResourceAllocator.class); + + private List<FpgaDevice> allowedFpgas = new LinkedList<>(); + + //key is resource type of FPGA, vendor plugin supported ID + private LinkedHashMap<String, List<FpgaDevice>> availableFpga = new LinkedHashMap<>(); + + //key is requetor, aka. container ID + private LinkedHashMap<String, List<FpgaDevice>> usedFpgaByRequestor = new LinkedHashMap<>(); + + private Context nmContext; + + @VisibleForTesting + public HashMap<String, List<FpgaDevice>> getAvailableFpga() { + return availableFpga; + } + + @VisibleForTesting + public List<FpgaDevice> getAllowedFpga() { + return allowedFpgas; + } + + public FpgaResourceAllocator(Context ctx) { + this.nmContext = ctx; + } + + @VisibleForTesting + public int getAvailableFpgaCount() { + int count = 0; + for (List<FpgaDevice> l : availableFpga.values()) { + count += l.size(); + } + return count; + } + + @VisibleForTesting + public HashMap<String, List<FpgaDevice>> getUsedFpga() { + return usedFpgaByRequestor; + } + + @VisibleForTesting + public int getUsedFpgaCount() { + int count = 0; + for (List<FpgaDevice> l : usedFpgaByRequestor.values()) { + count += l.size(); + } + return count; + } + + public static class FpgaAllocation { + + private List<FpgaDevice> allowed = Collections.emptyList(); + + private List<FpgaDevice> denied = Collections.emptyList(); + + FpgaAllocation(List<FpgaDevice> allowed, List<FpgaDevice> denied) { + if (allowed != null) { + this.allowed = ImmutableList.copyOf(allowed); + } + if (denied != null) { + this.denied = ImmutableList.copyOf(denied); + } + } + + public List<FpgaDevice> getAllowed() { + return allowed; + } + + public List<FpgaDevice> getDenied() { + return denied; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("\nFpgaAllocation\n\tAllowed:\n"); + for (FpgaDevice device : allowed) { + sb.append("\t\t"); + sb.append(device + "\n"); + } + sb.append("\tDenied\n"); + for (FpgaDevice device : denied) { + sb.append("\t\t"); + sb.append(device + "\n"); + } + return sb.toString(); + } + } + + public static class FpgaDevice implements Comparable<FpgaDevice>, Serializable { + + private static final long serialVersionUID = 1L; + + private String type; + private Integer major; + private Integer minor; + // IP file identifier. matrix multiplication for instance + private String IPID; + // the device name under /dev + private String devName; + // the alias device name. Intel use acl number acl0 to acl31 + private String aliasDevName; + // lspci output's bus number: 02:00.00 (bus:slot.func) + private String busNum; + private String temperature; + private String cardPowerUsage; + + public String getType() { + return type; + } + + public Integer getMajor() { + return major; + } + + public Integer getMinor() { + return minor; + } + + public String getIPID() { + return IPID; + } + + public void setIPID(String IPID) { + this.IPID = IPID; + } + + public String getDevName() { + return devName; + } + + public void setDevName(String devName) { + this.devName = devName; + } + + public String getAliasDevName() { + return aliasDevName; + } + + public void setAliasDevName(String aliasDevName) { + this.aliasDevName = aliasDevName; + } + + public String getBusNum() { + return busNum; + } + + public void setBusNum(String busNum) { + this.busNum = busNum; + } + + public String getTemperature() { + return temperature; + } + + public String getCardPowerUsage() { + return cardPowerUsage; + } + + public FpgaDevice(String type, Integer major, Integer minor, String IPID) { + this.type = type; + this.major = major; + this.minor = minor; + this.IPID = IPID; + } + + public FpgaDevice(String type, Integer major, + Integer minor, String IPID, String devName, + String aliasDevName, String busNum, String temperature, String cardPowerUsage) { + this.type = type; + this.major = major; + this.minor = minor; + this.IPID = IPID; + this.devName = devName; + this.aliasDevName = aliasDevName; + this.busNum = busNum; + this.temperature = temperature; + this.cardPowerUsage = cardPowerUsage; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof FpgaDevice)) { + return false; + } + FpgaDevice other = (FpgaDevice) obj; + if (other.getType().equals(this.type) && + other.getMajor().equals(this.major) && + other.getMinor().equals(this.minor)) { + return true; + } + return false; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((type == null) ? 0 : type.hashCode()); + result = prime * result + ((major == null) ? 0 : major.hashCode()); + result = prime * result + ((minor == null) ? 0 : minor.hashCode()); + return result; + } + + @Override + public int compareTo(FpgaDevice o) { + return 0; + } + + @Override + public String toString() { + return "FPGA Device:(Type: " + this.type + ", Major: " + + this.major + ", Minor: " + this.minor + ", IPID: " + this.IPID + ")"; + } + } + + public synchronized void addFpga(String type, List<FpgaDevice> list) { + availableFpga.putIfAbsent(type, new LinkedList<>()); + for (FpgaDevice device : list) { + if (!allowedFpgas.contains(device)) { + allowedFpgas.add(device); + availableFpga.get(type).add(device); + } + } + LOG.info("Add a list of FPGA Devices: " + list); + } + + public synchronized void updateFpga(String requestor, + FpgaDevice device, String newIPID) { + List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor); + int index = findMatchedFpga(usedFpgas, device); + if (-1 != index) { + usedFpgas.get(index).setIPID(newIPID); + } else { + LOG.warn("Failed to update FPGA due to unknown reason " + + "that no record for this allocated device:" + device); + } + LOG.info("Update IPID to " + newIPID + + " for this allocated device:" + device); + } + + private synchronized int findMatchedFpga(List<FpgaDevice> devices, FpgaDevice item) { + int i = 0; + for (; i < devices.size(); i++) { + if (devices.get(i) == item) { + return i; + } + } + return -1; + } + + /** + * Assign {@link FpgaAllocation} with preferred IPID, if no, with random FPGAs + * @param type vendor plugin supported FPGA device type + * @param count requested FPGA slot count + * @param container container id + * @param IPIDPreference allocate slot with this IPID first + * @return Instance consists two List of allowed and denied {@link FpgaDevice} + * @throws ResourceHandlerException When failed to allocate or write state store + * */ + public synchronized FpgaAllocation assignFpga(String type, long count, + Container container, String IPIDPreference) throws ResourceHandlerException { + List<FpgaDevice> currentAvailableFpga = availableFpga.get(type); + String requestor = container.getContainerId().toString(); + if (null == currentAvailableFpga) { + throw new ResourceHandlerException("No such type of FPGA resource available: " + type); + } + if (count < 0 || count > currentAvailableFpga.size()) { + throw new ResourceHandlerException("Invalid FPGA request count or not enough, requested:" + + count + ", available:" + getAvailableFpgaCount()); + } + if (count > 0) { + // Allocate devices with matching IP first, then any device is ok + List<FpgaDevice> assignedFpgas = new LinkedList<>(); + int matchIPCount = 0; + for (int i = 0; i < currentAvailableFpga.size(); i++) { + if ( null != currentAvailableFpga.get(i).getIPID() && + currentAvailableFpga.get(i).getIPID().equalsIgnoreCase(IPIDPreference)) { + assignedFpgas.add(currentAvailableFpga.get(i)); + currentAvailableFpga.remove(i); + matchIPCount++; + } + } + int remaining = (int) count - matchIPCount; + while (remaining > 0) { + assignedFpgas.add(currentAvailableFpga.remove(0)); + remaining--; + } + + // Record in state store if we allocated anything + if (!assignedFpgas.isEmpty()) { + try { + nmContext.getNMStateStore().storeAssignedResources(container, + FPGA_URI, new LinkedList<>(assignedFpgas)); + } catch (IOException e) { + // failed, give the allocation back + currentAvailableFpga.addAll(assignedFpgas); + throw new ResourceHandlerException(e); + } + + // update state store success, update internal used FPGAs + usedFpgaByRequestor.putIfAbsent(requestor, new LinkedList<>()); + usedFpgaByRequestor.get(requestor).addAll(assignedFpgas); + } + + return new FpgaAllocation(assignedFpgas, currentAvailableFpga); + } + return new FpgaAllocation(null, allowedFpgas); + } + + public synchronized void recoverAssignedFpgas(ContainerId containerId) throws ResourceHandlerException { + Container c = nmContext.getContainers().get(containerId); + if (null == c) { + throw new ResourceHandlerException( + "This shouldn't happen, cannot find container with id=" + + containerId); + } + + for (Serializable fpgaDevice : + c.getResourceMappings().getAssignedResources(FPGA_URI)) { + if (!(fpgaDevice instanceof FpgaDevice)) { + throw new ResourceHandlerException( + "Trying to recover allocated FPGA devices, however it" + + " is not FpgaDevice type, this shouldn't happen"); + } + + // Make sure it is in allowed FPGA device. + if (!allowedFpgas.contains(fpgaDevice)) { + throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice + + " however it is not in allowed device list:" + StringUtils + .join(";", allowedFpgas)); + } + + // Make sure it is not occupied by anybody else + Iterator<Map.Entry<String, List<FpgaDevice>>> iterator = + getUsedFpga().entrySet().iterator(); + while (iterator.hasNext()) { + if (iterator.next().getValue().contains(fpgaDevice)) { + throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice + + " however it is already assigned to others"); + } + } + getUsedFpga().putIfAbsent(containerId.toString(), new LinkedList<>()); + getUsedFpga().get(containerId.toString()).add((FpgaDevice) fpgaDevice); + // remove them from available list + getAvailableFpga().get(((FpgaDevice) fpgaDevice).getType()).remove(fpgaDevice); + } + } + + public synchronized void cleanupAssignFpgas(String requestor) { + List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor); + if (usedFpgas != null) { + for (FpgaDevice device : usedFpgas) { + // Add back to availableFpga + availableFpga.get(device.getType()).add(device); + } + usedFpgaByRequestor.remove(requestor); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java new file mode 100644 index 0000000..bf3d9b0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; + [email protected] [email protected] +public class FpgaResourceHandlerImpl implements ResourceHandler { + + static final Log LOG = LogFactory.getLog(FpgaResourceHandlerImpl.class); + + private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID"; + + private AbstractFpgaVendorPlugin vendorPlugin; + + private FpgaResourceAllocator allocator; + + private CGroupsHandler cGroupsHandler; + + public static final String EXCLUDED_FPGAS_CLI_OPTION = "--excluded_fpgas"; + public static final String CONTAINER_ID_CLI_OPTION = "--container_id"; + private PrivilegedOperationExecutor privilegedOperationExecutor; + + @VisibleForTesting + public FpgaResourceHandlerImpl(Context nmContext, + CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperationExecutor, + AbstractFpgaVendorPlugin plugin) { + this.allocator = new FpgaResourceAllocator(nmContext); + this.vendorPlugin = plugin; + FpgaDiscoverer.getInstance().setResourceHanderPlugin(vendorPlugin); + this.cGroupsHandler = cGroupsHandler; + this.privilegedOperationExecutor = privilegedOperationExecutor; + } + + @VisibleForTesting + public FpgaResourceAllocator getFpgaAllocator() { + return allocator; + } + + public String getRequestedIPID(Container container) { + String r= container.getLaunchContext().getEnvironment(). + get(REQUEST_FPGA_IP_ID_KEY); + return r == null ? "" : r; + } + + @Override + public List<PrivilegedOperation> bootstrap(Configuration configuration) throws ResourceHandlerException { + // The plugin should be initilized by FpgaDiscoverer already + if (!vendorPlugin.initPlugin(configuration)) { + throw new ResourceHandlerException("FPGA plugin initialization failed", null); + } + LOG.info("FPGA Plugin bootstrap success."); + // Get avialable devices minor numbers from toolchain or static configuration + List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList = FpgaDiscoverer.getInstance().discover(); + allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList); + this.cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.DEVICES); + return null; + } + + @Override + public List<PrivilegedOperation> preStart(Container container) throws ResourceHandlerException { + // 1. Get requested FPGA type and count, choose corresponding FPGA plugin(s) + // 2. Use allocator.assignFpga(type, count) to get FPGAAllocation + // 3. If required, download to ensure IP file exists and configure IP file for all devices + List<PrivilegedOperation> ret = new ArrayList<>(); + String containerIdStr = container.getContainerId().toString(); + Resource requestedResource = container.getResource(); + + // Create device cgroups for the container + cGroupsHandler.createCGroup(CGroupsHandler.CGroupController.DEVICES, + containerIdStr); + + long deviceCount = requestedResource.getResourceValue(FPGA_URI); + LOG.info(containerIdStr + " requested " + deviceCount + " Intel FPGA(s)"); + String ipFilePath = null; + try { + + // allocate even request 0 FPGA because we need to deny all device numbers for this container + FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga( + vendorPlugin.getFpgaType(), deviceCount, + container, getRequestedIPID(container)); + LOG.info("FpgaAllocation:" + allocation); + + PrivilegedOperation privilegedOperation = new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA, + Arrays.asList(CONTAINER_ID_CLI_OPTION, containerIdStr)); + if (!allocation.getDenied().isEmpty()) { + List<Integer> denied = new ArrayList<>(); + allocation.getDenied().forEach(device -> denied.add(device.getMinor())); + privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_FPGAS_CLI_OPTION, + StringUtils.join(",", denied))); + } + privilegedOperationExecutor.executePrivilegedOperation(privilegedOperation, true); + + if (deviceCount > 0) { + /** + * We only support flashing one IP for all devices now. If user don't set this + * environment variable, we assume that user's application can find the IP file by + * itself. + * Note that the IP downloading and reprogramming in advance in YARN is not necessary because + * the OpenCL application may find the IP file and reprogram device on the fly. But YARN do this + * for the containers will achieve the quickest reprogram path + * + * For instance, REQUESTED_FPGA_IP_ID = "matrix_mul" will make all devices + * programmed with matrix multiplication IP + * + * In the future, we may support "matrix_mul:1,gzip:2" format to support different IP + * for different devices + * + * */ + ipFilePath = vendorPlugin.downloadIP(getRequestedIPID(container), container.getWorkDir(), + container.getResourceSet().getLocalizedResources()); + if (ipFilePath.isEmpty()) { + LOG.warn("FPGA plugin failed to download IP but continue, please check the value of environment viable: " + + REQUEST_FPGA_IP_ID_KEY + " if you want yarn to help"); + } else { + LOG.info("IP file path:" + ipFilePath); + List<FpgaResourceAllocator.FpgaDevice> allowed = allocation.getAllowed(); + String majorMinorNumber; + for (int i = 0; i < allowed.size(); i++) { + majorMinorNumber = allowed.get(i).getMajor() + ":" + allowed.get(i).getMinor(); + String currentIPID = allowed.get(i).getIPID(); + if (null != currentIPID && + currentIPID.equalsIgnoreCase(getRequestedIPID(container))) { + LOG.info("IP already in device \"" + allowed.get(i).getAliasDevName() + "," + + majorMinorNumber + "\", skip reprogramming"); + continue; + } + if (vendorPlugin.configureIP(ipFilePath, majorMinorNumber)) { + // update the allocator that we update an IP of a device + allocator.updateFpga(containerIdStr, allowed.get(i), + getRequestedIPID(container)); + //TODO: update the node constraint label + } + } + } + } + } catch (ResourceHandlerException re) { + allocator.cleanupAssignFpgas(containerIdStr); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, + containerIdStr); + throw re; + } catch (PrivilegedOperationException e) { + allocator.cleanupAssignFpgas(containerIdStr); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, containerIdStr); + LOG.warn("Could not update cgroup for container", e); + throw new ResourceHandlerException(e); + } + //isolation operation + ret.add(new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + + cGroupsHandler.getPathForCGroupTasks( + CGroupsHandler.CGroupController.DEVICES, containerIdStr))); + return ret; + } + + @Override + public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) throws ResourceHandlerException { + allocator.recoverAssignedFpgas(containerId); + return null; + } + + @Override + public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException { + allocator.cleanupAssignFpgas(containerId.toString()); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, + containerId.toString()); + return null; + } + + @Override + public List<PrivilegedOperation> teardown() throws ResourceHandlerException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index 73d6038..12d679b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; /** @@ -42,7 +44,7 @@ public class ResourcePluginManager { private static final Logger LOG = LoggerFactory.getLogger(ResourcePluginManager.class); private static final Set<String> SUPPORTED_RESOURCE_PLUGINS = ImmutableSet.of( - GPU_URI); + GPU_URI, FPGA_URI); private Map<String, ResourcePlugin> configuredPlugins = Collections.EMPTY_MAP; @@ -77,6 +79,10 @@ public class ResourcePluginManager { plugin = new GpuResourcePlugin(); } + if (resourceName.equals(FPGA_URI)) { + plugin = new FpgaResourcePlugin(); + } + if (plugin == null) { throw new YarnException( "This shouldn't happen, plugin=" + resourceName http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java new file mode 100644 index 0000000..60ea57c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator; + +import java.util.List; +import java.util.Map; + + +/** + * FPGA plugin interface for vendor to implement. Used by {@link FpgaDiscoverer} and + * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceHandlerImpl} + * to discover devices/download IP/configure IP + * */ + [email protected] [email protected] +public interface AbstractFpgaVendorPlugin extends Configurable{ + + /** + * Check vendor's toolchain and required environment + * */ + boolean initPlugin(Configuration conf); + + /** + * Diagnose the devices using vendor toolchain but no need to parse device information + * */ + boolean diagnose(int timeout); + + /** + * Discover the vendor's FPGA devices with execution time constraint + * @param timeout The vendor plugin should return result during this time + * @return The result will be added to FPGAResourceAllocator for later scheduling + * */ + List<FpgaResourceAllocator.FpgaDevice> discover(int timeout); + + /** + * Since all vendor plugins share a {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator} + * which distinguish FPGA devices by type. Vendor plugin must report this. + * */ + String getFpgaType(); + + /** + * The vendor plugin download required IP files to a required directory. + * It should check if the IP file has already been downloaded. + * @param id The identifier for IP file. Comes from application, ie. matrix_multi_v1 + * @param dstDir The plugin should download IP file to this directory + * @param localizedResources The container localized resource can be searched for IP file. Key is + * localized file path and value is soft link names + * @return The absolute path string of IP file + * */ + String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources); + + /** + * The vendor plugin configure an IP file to a device + * @param ipPath The absolute path of the IP file + * @param majorMinorNumber The device in format <major:minor> + * @return configure device ok or not + * */ + boolean configureIP(String ipPath, String majorMinorNumber); + + @Override + void setConf(Configuration conf); + + @Override + Configuration getConf(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java new file mode 100644 index 0000000..8d32a18 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; + +public class FpgaDiscoverer { + + public static final Logger LOG = LoggerFactory.getLogger( + FpgaDiscoverer.class); + + private static FpgaDiscoverer instance; + + private Configuration conf = null; + + private AbstractFpgaVendorPlugin plugin = null; + + private List<FpgaResourceAllocator.FpgaDevice> currentFpgaInfo = null; + + // shell command timeout + private static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000; + + static { + instance = new FpgaDiscoverer(); + } + + public static FpgaDiscoverer getInstance() { + return instance; + } + + @VisibleForTesting + public synchronized static FpgaDiscoverer setInstance(FpgaDiscoverer newInstance) { + instance = newInstance; + return instance; + } + + @VisibleForTesting + public synchronized void setConf(Configuration conf) { + this.conf = conf; + } + + public List<FpgaResourceAllocator.FpgaDevice> getCurrentFpgaInfo() { + return currentFpgaInfo; + } + + public synchronized void setResourceHanderPlugin(AbstractFpgaVendorPlugin plugin) { + this.plugin = plugin; + } + + public synchronized boolean diagnose() { + return this.plugin.diagnose(MAX_EXEC_TIMEOUT_MS); + } + + public synchronized void initialize(Configuration conf) throws YarnException { + this.conf = conf; + this.plugin.initPlugin(conf); + // Try to diagnose FPGA + LOG.info("Trying to diagnose FPGA information ..."); + if (!diagnose()) { + LOG.warn("Failed to pass FPGA devices diagnose"); + } + } + + /** + * get avialable devices minor numbers from toolchain or static configuration + * */ + public synchronized List<FpgaResourceAllocator.FpgaDevice> discover() throws ResourceHandlerException { + List<FpgaResourceAllocator.FpgaDevice> list; + String allowed = this.conf.get(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES); + // whatever static or auto discover, we always needs + // the vendor plugin to discover. For instance, IntelFpgaOpenclPlugin need to + // setup a mapping of <major:minor> to <aliasDevName> + list = this.plugin.discover(MAX_EXEC_TIMEOUT_MS); + if (0 == list.size()) { + throw new ResourceHandlerException("No FPGA devices detected!"); + } + currentFpgaInfo = list; + if (allowed.equalsIgnoreCase( + YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES)) { + return list; + } else if (allowed.matches("(\\d,)*\\d")){ + String[] minors = allowed.split(","); + Iterator<FpgaResourceAllocator.FpgaDevice> iterator = list.iterator(); + // remove the non-configured minor numbers + FpgaResourceAllocator.FpgaDevice t; + while (iterator.hasNext()) { + boolean valid = false; + t = iterator.next(); + for (String minorNumber : minors) { + if (t.getMinor().toString().equals(minorNumber)) { + valid = true; + break; + } + } + if (!valid) { + iterator.remove(); + } + } + // if the count of user configured is still larger than actual + if (list.size() != minors.length) { + LOG.warn("We continue although there're mistakes in user's configuration " + + YarnConfiguration.NM_FPGA_ALLOWED_DEVICES + + "user configured:" + allowed + ", while the real:" + list.toString()); + } + } else { + throw new ResourceHandlerException("Invalid value configured for " + + YarnConfiguration.NM_FPGA_ALLOWED_DEVICES + ":\"" + allowed + "\""); + } + return list; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java new file mode 100644 index 0000000..7511d8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga; + + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; + +public class FpgaNodeResourceUpdateHandler extends NodeResourceUpdaterPlugin { + private static final Logger LOG = LoggerFactory.getLogger( + FpgaNodeResourceUpdateHandler.class); + + @Override + public void updateConfiguredResource(Resource res) throws YarnException { + LOG.info("Initializing configured FPGA resources for the NodeManager."); + List<FpgaResourceAllocator.FpgaDevice> list = FpgaDiscoverer.getInstance().getCurrentFpgaInfo(); + List<Integer> minors = new LinkedList<>(); + for (FpgaResourceAllocator.FpgaDevice device : list) { + minors.add(device.getMinor()); + } + if (minors.isEmpty()) { + LOG.info("Didn't find any usable FPGAs on the NodeManager."); + return; + } + long count = minors.size(); + + Map<String, ResourceInformation> configuredResourceTypes = + ResourceUtils.getResourceTypes(); + if (!configuredResourceTypes.containsKey(FPGA_URI)) { + throw new YarnException("Wrong configurations, found " + count + + " usable FPGAs, however " + FPGA_URI + + " resource-type is not configured inside" + + " resource-types.xml, please configure it to enable FPGA feature or" + + " remove " + FPGA_URI + " from " + + YarnConfiguration.NM_RESOURCE_PLUGINS); + } + + res.setResourceValue(FPGA_URI, count); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java new file mode 100644 index 0000000..44d093e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceHandlerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo; + +public class FpgaResourcePlugin implements ResourcePlugin { + private static final Log LOG = LogFactory.getLog(FpgaResourcePlugin.class); + + private ResourceHandler fpgaResourceHandler = null; + + private AbstractFpgaVendorPlugin vendorPlugin = null; + private FpgaNodeResourceUpdateHandler fpgaNodeResourceUpdateHandler = null; + + private AbstractFpgaVendorPlugin createFpgaVendorPlugin(Configuration conf) { + String vendorPluginClass = conf.get(YarnConfiguration.NM_FPGA_VENDOR_PLUGIN, + YarnConfiguration.DEFAULT_NM_FPGA_VENDOR_PLUGIN); + LOG.info("Using FPGA vendor plugin: " + vendorPluginClass); + try { + Class<?> schedulerClazz = Class.forName(vendorPluginClass); + if (AbstractFpgaVendorPlugin.class.isAssignableFrom(schedulerClazz)) { + return (AbstractFpgaVendorPlugin) ReflectionUtils.newInstance(schedulerClazz, + conf); + } else { + throw new YarnRuntimeException("Class: " + vendorPluginClass + + " not instance of " + AbstractFpgaVendorPlugin.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate FPGA vendor plugin: " + + vendorPluginClass, e); + } + } + + @Override + public void initialize(Context context) throws YarnException { + // Get vendor plugin from configuration + this.vendorPlugin = createFpgaVendorPlugin(context.getConf()); + FpgaDiscoverer.getInstance().setResourceHanderPlugin(vendorPlugin); + FpgaDiscoverer.getInstance().initialize(context.getConf()); + fpgaNodeResourceUpdateHandler = new FpgaNodeResourceUpdateHandler(); + } + + @Override + public ResourceHandler createResourceHandler( + Context nmContext, CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperationExecutor) { + if (fpgaResourceHandler == null) { + fpgaResourceHandler = new FpgaResourceHandlerImpl(nmContext, + cGroupsHandler, privilegedOperationExecutor, vendorPlugin); + } + return fpgaResourceHandler; + } + + @Override + public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() { + return fpgaNodeResourceUpdateHandler; + } + + @Override + public void cleanup() throws YarnException { + + } + + @Override + public DockerCommandPlugin getDockerCommandPluginInstance() { + return null; + } + + @Override + public NMResourceInfo getNMResourceInfo() throws YarnException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java new file mode 100644 index 0000000..f2e82b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Intel FPGA for OpenCL plugin. + * The key points are: + * 1. It uses Intel's toolchain "aocl" to discover devices/reprogram IP to the device + * before container launch to achieve a quickest reprogramming path + * 2. It avoids reprogramming by maintaining a mapping of device to FPGA IP ID + * 3. It assume IP file is distributed to container directory + */ +public class IntelFpgaOpenclPlugin implements AbstractFpgaVendorPlugin { + public static final Logger LOG = LoggerFactory.getLogger( + IntelFpgaOpenclPlugin.class); + + private boolean initialized = false; + private Configuration conf; + private InnerShellExecutor shell; + + protected static final String DEFAULT_BINARY_NAME = "aocl"; + + protected static final String ALTERAOCLSDKROOT_NAME = "ALTERAOCLSDKROOT"; + + private String pathToExecutable = null; + + // a mapping of major:minor number to acl0-31 + private Map<String, String> aliasMap; + + public IntelFpgaOpenclPlugin() { + this.shell = new InnerShellExecutor(); + } + + public String getDefaultBinaryName() { + return DEFAULT_BINARY_NAME; + } + + public String getDefaultPathToExecutable() { + return System.getenv(ALTERAOCLSDKROOT_NAME); + } + + public static String getDefaultPathEnvName() { + return ALTERAOCLSDKROOT_NAME; + } + + @VisibleForTesting + public String getPathToExecutable() { + return pathToExecutable; + } + + public void setPathToExecutable(String pathToExecutable) { + this.pathToExecutable = pathToExecutable; + } + + @VisibleForTesting + public void setShell(InnerShellExecutor shell) { + this.shell = shell; + } + + public Map<String, String> getAliasMap() { + return aliasMap; + } + + /** + * Check the Intel FPGA for OpenCL toolchain + * */ + @Override + public boolean initPlugin(Configuration conf) { + this.aliasMap = new HashMap<>(); + if (this.initialized) { + return true; + } + // Find the proper toolchain, mainly aocl + String pluginDefaultBinaryName = getDefaultBinaryName(); + String pathToExecutable = conf.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC, + ""); + if (pathToExecutable.isEmpty()) { + pathToExecutable = pluginDefaultBinaryName; + } + // Validate file existence + File binaryPath = new File(pathToExecutable); + if (!binaryPath.exists()) { + // When binary not exist, fail + LOG.warn("Failed to find FPGA discoverer executable configured in " + + YarnConfiguration.NM_FPGA_PATH_TO_EXEC + + ", please check! Try default path"); + pathToExecutable = pluginDefaultBinaryName; + // Try to find in plugin's preferred path + String pluginDefaultPreferredPath = getDefaultPathToExecutable(); + if (null == pluginDefaultPreferredPath) { + LOG.warn("Failed to find FPGA discoverer executable from system environment " + + getDefaultPathEnvName()+ + ", please check your environment!"); + } else { + binaryPath = new File(pluginDefaultPreferredPath + "/bin", pluginDefaultBinaryName); + if (binaryPath.exists()) { + pathToExecutable = pluginDefaultPreferredPath; + } else { + pathToExecutable = pluginDefaultBinaryName; + LOG.warn("Failed to find FPGA discoverer executable in " + + pluginDefaultPreferredPath + ", file doesn't exists! Use default binary" + pathToExecutable); + } + } + } + setPathToExecutable(pathToExecutable); + if (!diagnose(10*1000)) { + LOG.warn("Intel FPGA for OpenCL diagnose failed!"); + this.initialized = false; + } else { + this.initialized = true; + } + return this.initialized; + } + + @Override + public List<FpgaResourceAllocator.FpgaDevice> discover(int timeout) { + List<FpgaResourceAllocator.FpgaDevice> list = new LinkedList<>(); + String output; + output = getDiagnoseInfo(timeout); + if (null == output) { + return list; + } + parseDiagnoseInfo(output, list); + return list; + } + + public static class InnerShellExecutor { + + // ls /dev/<devName> + // return a string in format <major:minor> + public String getMajorAndMinorNumber(String devName) { + String output = null; + Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( + new String[]{"stat", "-c", "%t:%T", "/dev/" + devName}); + try { + LOG.debug("Get FPGA major-minor numbers from /dev/" + devName); + shexec.execute(); + String[] strs = shexec.getOutput().trim().split(":"); + LOG.debug("stat output:" + shexec.getOutput()); + output = Integer.parseInt(strs[0], 16) + ":" + Integer.parseInt(strs[1], 16); + } catch (IOException e) { + String msg = + "Failed to get major-minor number from reading /dev/" + devName; + LOG.warn(msg); + LOG.debug("Command output:" + shexec.getOutput() + ", exit code:" + + shexec.getExitCode()); + } + return output; + } + + public String runDiagnose(String binary, int timeout) { + String output = null; + Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( + new String[]{binary, "diagnose"}); + try { + shexec.execute(); + } catch (IOException e) { + // aocl diagnose exit code is 1 even it success. + // we ignore it because we only wants the output + String msg = + "Failed to execute " + binary + " diagnose, exception message:" + e + .getMessage() +", output:" + output + ", continue ..."; + LOG.warn(msg); + LOG.debug(shexec.getOutput()); + } + return shexec.getOutput(); + } + + } + + /** + * One real sample output of Intel FPGA SDK 17.0's "aocl diagnose" is as below: + * " + * aocl diagnose: Running diagnose from /home/fpga/intelFPGA_pro/17.0/hld/board/nalla_pcie/linux64/libexec + * + * ------------------------- acl0 ------------------------- + * Vendor: Nallatech ltd + * + * Phys Dev Name Status Information + * + * aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0) + * PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8 + * FPGA temperature = 54.4 degrees C. + * Total Card Power Usage = 31.7 Watts. + * Device Power Usage = 0.0 Watts. + * + * DIAGNOSTIC_PASSED + * --------------------------------------------------------- + * " + * + * While per Intel's guide, the output(should be outdated or prior SDK version's) is as below: + * + * " + * aocl diagnose: Running diagnostic from ALTERAOCLSDKROOT/board/<board_name>/ + * <platform>/libexec + * Verified that the kernel mode driver is installed on the host machine. + * Using board package from vendor: <board_vendor_name> + * Querying information for all supported devices that are installed on the host + * machine ... + * + * device_name Status Information + * + * acl0 Passed <descriptive_board_name> + * PCIe dev_id = <device_ID>, bus:slot.func = 02:00.00, + * at Gen 2 with 8 lanes. + * FPGA temperature=43.0 degrees C. + * acl1 Passed <descriptive_board_name> + * PCIe dev_id = <device_ID>, bus:slot.func = 03:00.00, + * at Gen 2 with 8 lanes. + * FPGA temperature = 35.0 degrees C. + * + * Found 2 active device(s) installed on the host machine, to perform a full + * diagnostic on a specific device, please run aocl diagnose <device_name> + * + * DIAGNOSTIC_PASSED + * " + * But this method only support the first output + * */ + public void parseDiagnoseInfo(String output, List<FpgaResourceAllocator.FpgaDevice> list) { + if (output.contains("DIAGNOSTIC_PASSED")) { + Matcher headerStartMatcher = Pattern.compile("acl[0-31]").matcher(output); + Matcher headerEndMatcher = Pattern.compile("(?i)DIAGNOSTIC_PASSED").matcher(output); + int sectionStartIndex; + int sectionEndIndex; + String aliasName; + while (headerStartMatcher.find()) { + sectionStartIndex = headerStartMatcher.end(); + String section = null; + aliasName = headerStartMatcher.group(); + while (headerEndMatcher.find(sectionStartIndex)) { + sectionEndIndex = headerEndMatcher.start(); + section = output.substring(sectionStartIndex, sectionEndIndex); + break; + } + if (null == section) { + LOG.warn("Unsupported diagnose output"); + return; + } + // devName, \(.*\) + // busNum, bus:slot.func\s=\s.*, + // FPGA temperature\s=\s.* + // Total\sCard\sPower\sUsage\s=\s.* + String[] fieldRegexes = new String[]{"\\(.*\\)\n", "(?i)bus:slot.func\\s=\\s.*,", + "(?i)FPGA temperature\\s=\\s.*", "(?i)Total\\sCard\\sPower\\sUsage\\s=\\s.*"}; + String[] fields = new String[4]; + String tempFieldValue; + for (int i = 0; i < fieldRegexes.length; i++) { + Matcher fieldMatcher = Pattern.compile(fieldRegexes[i]).matcher(section); + if (!fieldMatcher.find()) { + LOG.warn("Couldn't find " + fieldRegexes[i] + " pattern"); + fields[i] = ""; + continue; + } + tempFieldValue = fieldMatcher.group().trim(); + if (i == 0) { + // special case for Device name + fields[i] = tempFieldValue.substring(1, tempFieldValue.length() - 1); + } else { + String ss = tempFieldValue.split("=")[1].trim(); + fields[i] = ss.substring(0, ss.length() - 1); + } + } + String majorMinorNumber = this.shell.getMajorAndMinorNumber(fields[0]); + if (null != majorMinorNumber) { + String[] mmn = majorMinorNumber.split(":"); + this.aliasMap.put(majorMinorNumber, aliasName); + list.add(new FpgaResourceAllocator.FpgaDevice(getFpgaType(), + Integer.parseInt(mmn[0]), + Integer.parseInt(mmn[1]), null, + fields[0], aliasName, fields[1], fields[2], fields[3])); + } + }// end while + }// end if + } + + public String getDiagnoseInfo(int timeout) { + return this.shell.runDiagnose(this.pathToExecutable,timeout); + } + + @Override + public boolean diagnose(int timeout) { + String output = getDiagnoseInfo(timeout); + if (null != output && output.contains("DIAGNOSTIC_PASSED")) { + return true; + } + return false; + } + + /** + * this is actually the opencl platform type + * */ + @Override + public String getFpgaType() { + return "IntelOpenCL"; + } + + @Override + public String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources) { + // Assume .aocx IP file is distributed by DS to local dir + String r = ""; + Path path; + LOG.info("Got environment: " + id + ", search IP file in localized resources"); + if (null == id || id.isEmpty()) { + LOG.warn("IP_ID environment is empty, skip downloading"); + return r; + } + if (localizedResources != null) { + for (Map.Entry<Path, List<String>> resourceEntry : + localizedResources.entrySet()) { + path = resourceEntry.getKey(); + LOG.debug("Check:" + path.toUri().toString()); + if (path.getName().toLowerCase().contains(id.toLowerCase()) && path.getName().endsWith(".aocx")) { + r = path.toUri().toString(); + LOG.debug("Found: " + r); + break; + } + } + } else { + LOG.warn("Localized resource is null!"); + } + return r; + } + + /** + * Program one device. + * It's ok for the offline "aocl program" failed because the application will always invoke API to program + * The reason we do offline reprogramming is to make the application's program process faster + * @param ipPath the absolute path to the aocx IP file + * @param majorMinorNumber major:minor string + * @return True or False + * */ + @Override + public boolean configureIP(String ipPath, String majorMinorNumber) { + // perform offline program the IP to get a quickest reprogramming sequence + // we need a mapping of "major:minor" to "acl0" to issue command "aocl program <acl0> <ipPath>" + Shell.ShellCommandExecutor shexec; + String aclName; + aclName = this.aliasMap.get(majorMinorNumber); + shexec = new Shell.ShellCommandExecutor( + new String[]{this.pathToExecutable, "program", aclName, ipPath}); + try { + shexec.execute(); + if (0 == shexec.getExitCode()) { + LOG.debug(shexec.getOutput()); + LOG.info("Intel aocl program " + ipPath + " to " + aclName + " successfully"); + } else { + return false; + } + } catch (IOException e) { + LOG.error("Intel aocl program " + ipPath + " to " + aclName + " failed!"); + e.printStackTrace(); + return false; + } + return true; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return this.conf; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
