This is an automated email from the ASF dual-hosted git repository. pepperjo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-crail.git
The following commit(s) were added to refs/heads/master by this push: new d07249a [Resource Elasticity] 3/3 Elastic Namenode Service d07249a is described below commit d07249a028818203ef0e07d400c9415f483143de Author: mbrodmann <mbrodm...@student.ethz.ch> AuthorDate: Mon Mar 1 10:43:21 2021 +0100 [Resource Elasticity] 3/3 Elastic Namenode Service * implement simple autoscaling functionality * renaming and small changes * small style changes --- .../java/org/apache/crail/conf/CrailConstants.java | 52 +++++++++ .../java/org/apache/crail/namenode/BlockStore.java | 125 ++++++++++++++++++++- .../org/apache/crail/namenode/DataNodeBlocks.java | 4 + .../crail/namenode/ElasticNameNodeService.java | 19 ++++ .../apache/crail/namenode/FreeCapacityPolicy.java | 75 +++++++++++++ .../org/apache/crail/namenode/NameNodeService.java | 20 ++++ .../org/apache/crail/namenode/PolicyRunner.java | 51 +++++++++ 7 files changed, 343 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/crail/conf/CrailConstants.java b/client/src/main/java/org/apache/crail/conf/CrailConstants.java index 310abc1..0ee25eb 100644 --- a/client/src/main/java/org/apache/crail/conf/CrailConstants.java +++ b/client/src/main/java/org/apache/crail/conf/CrailConstants.java @@ -118,6 +118,25 @@ public class CrailConstants { public static final String STORAGE_KEEPALIVE_KEY = "crail.storage.keepalive"; public static int STORAGE_KEEPALIVE = 2; + // elasticstore + public static final String ELASTICSTORE_SCALEUP_KEY = "crail.elasticstore.scaleup"; + public static double ELASTICSTORE_SCALEUP = 0.4; + + public static final String ELASTICSTORE_SCALEDOWN_KEY = "crail.elasticstore.scaledown"; + public static double ELASTICSTORE_SCALEDOWN = 0.1; + + public static final String ELASTICSTORE_MINNODES_KEY = "crail.elasticstore.minnodes"; + public static int ELASTICSTORE_MINNODES = 1; + + public static final String ELASTICSTORE_MAXNODES_KEY = "crail.elasticstore.maxnodes"; + public static int ELASTICSTORE_MAXNODES = 10; + + public static final String ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY = "crail.elasticstore.policyrunner.interval"; + public static int ELASTICSTORE_POLICYRUNNER_INTERVAL = 1000; + + public static final String ELASTICSTORE_LOGGING_KEY = "crail.elasticstore.logging"; + public static boolean ELASTICSTORE_LOGGING = false; + public static void updateConstants(CrailConfiguration conf){ //general if (conf.get(DIRECTORY_DEPTH_KEY) != null) { @@ -191,6 +210,9 @@ public class CrailConstants { if (conf.get(NAMENODE_RPC_TYPE_KEY) != null) { NAMENODE_RPC_TYPE = conf.get(NAMENODE_RPC_TYPE_KEY); } + if (conf.get(NAMENODE_RPC_SERVICE_KEY) != null) { + NAMENODE_RPC_SERVICE = conf.get(NAMENODE_RPC_SERVICE_KEY); + } if (conf.get(NAMENODE_LOG_KEY) != null) { NAMENODE_LOG = conf.get(NAMENODE_LOG_KEY); } @@ -210,6 +232,29 @@ public class CrailConstants { if (conf.get(STORAGE_KEEPALIVE_KEY) != null) { STORAGE_KEEPALIVE = Integer.parseInt(conf.get(STORAGE_KEEPALIVE_KEY)); } + + //elasticstore + if (conf.get(ELASTICSTORE_SCALEUP_KEY) != null) { + ELASTICSTORE_SCALEUP = Double.parseDouble(conf.get(ELASTICSTORE_SCALEUP_KEY)); + } + + if (conf.get(ELASTICSTORE_SCALEDOWN_KEY) != null) { + ELASTICSTORE_SCALEDOWN = Double.parseDouble(conf.get(ELASTICSTORE_SCALEDOWN_KEY)); + } + + if (conf.get(ELASTICSTORE_MINNODES_KEY) != null) { + ELASTICSTORE_MINNODES = Integer.parseInt(conf.get(ELASTICSTORE_MINNODES_KEY)); + } + + if (conf.get(ELASTICSTORE_MAXNODES_KEY) != null) { + ELASTICSTORE_MAXNODES = Integer.parseInt(conf.get(ELASTICSTORE_MAXNODES_KEY)); + } + if (conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY) != null) { + ELASTICSTORE_POLICYRUNNER_INTERVAL = Integer.parseInt(conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY)); + } + if (conf.get(ELASTICSTORE_LOGGING_KEY) != null) { + ELASTICSTORE_LOGGING = Boolean.parseBoolean(conf.get(ELASTICSTORE_LOGGING_KEY)); + } } public static void printConf(){ @@ -237,11 +282,18 @@ public class CrailConstants { LOG.info(NAMENODE_BLOCKSELECTION_KEY + " " + NAMENODE_BLOCKSELECTION); LOG.info(NAMENODE_FILEBLOCKS_KEY + " " + NAMENODE_FILEBLOCKS); LOG.info(NAMENODE_RPC_TYPE_KEY + " " + NAMENODE_RPC_TYPE); + LOG.info(NAMENODE_RPC_SERVICE_KEY + " " + NAMENODE_RPC_SERVICE); LOG.info(NAMENODE_LOG_KEY + " " + NAMENODE_LOG); LOG.info(STORAGE_TYPES_KEY + " " + STORAGE_TYPES); LOG.info(STORAGE_CLASSES_KEY + " " + STORAGE_CLASSES); LOG.info(STORAGE_ROOTCLASS_KEY + " " + STORAGE_ROOTCLASS); LOG.info(STORAGE_KEEPALIVE_KEY + " " + STORAGE_KEEPALIVE); + LOG.info(ELASTICSTORE_SCALEUP_KEY + " " + ELASTICSTORE_SCALEUP); + LOG.info(ELASTICSTORE_SCALEDOWN_KEY + " " + ELASTICSTORE_SCALEDOWN); + LOG.info(ELASTICSTORE_MAXNODES_KEY + " " + ELASTICSTORE_MAXNODES); + LOG.info(ELASTICSTORE_MINNODES_KEY + " " + ELASTICSTORE_MINNODES); + LOG.info(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY + " " + ELASTICSTORE_POLICYRUNNER_INTERVAL); + LOG.info(ELASTICSTORE_LOGGING_KEY + " " + ELASTICSTORE_LOGGING); } public static void verify() throws IOException { diff --git a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java index fa32ffb..7c616a2 100644 --- a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java +++ b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java @@ -20,7 +20,7 @@ package org.apache.crail.namenode; import java.io.IOException; import java.net.UnknownHostException; -import java.util.ArrayList; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -97,7 +97,7 @@ public class BlockStore { // nevertheless target only one running datanode instance. // Therefore we can iterate over all storageClasses to check whether // the requested datanode is part of one of the storageClasses. - for(StorageClass storageClass : storageClasses) { + for (StorageClass storageClass : storageClasses) { if (storageClass.getDataNode(dn) != null) { return storageClass.prepareForRemovalDatanode(dn); } @@ -107,6 +107,85 @@ public class BlockStore { return RpcErrors.ERR_DATANODE_NOT_REGISTERED; } + public double getStorageUsedPercentage() throws Exception { + long total = 0; + long free = 0; + for (StorageClass storageClass : storageClasses) { + total += storageClass.getTotalBlockCount(); + free += storageClass.getFreeBlockCount(); + } + + // if there is no available capacity (i.e. total number of available blocks is 0), + // return 1.0 which tells that all storage is used + if (total != 0) { + double available = (double) free / (double) total; + return 1.0 - available; + } else { + return 1.0; + } + + } + + public long getNumberOfBlocksUsed() throws Exception { + int total = 0; + + for (StorageClass storageClass: storageClasses) { + total += (storageClass.getTotalBlockCount() - storageClass.getFreeBlockCount()); + } + + return total; + } + + public long getNumberOfBlocks() throws Exception { + int total = 0; + + for (StorageClass storageClass: storageClasses) { + total += storageClass.getTotalBlockCount(); + } + + return total; + } + + public int getNumberOfRunningDatanodes() { + int total = 0; + + for (StorageClass storageClass : storageClasses) { + total += storageClass.getNumberOfRunningDatanodes(); + } + + return total; + } + + public DataNodeBlocks identifyRemoveCandidate() { + + ArrayList<DataNodeBlocks> dataNodeBlocks = new ArrayList<DataNodeBlocks>(); + for (StorageClass storageClass : storageClasses) { + dataNodeBlocks.addAll(storageClass.getDataNodeBlocks()); + } + + // sort all datanodes by increasing numbers of available datablocks + Collections.sort(dataNodeBlocks, new Comparator<DataNodeBlocks>() { + public int compare(DataNodeBlocks d1, DataNodeBlocks d2) { + if (d1.getBlockCount() < d2.getBlockCount()) { + return 1; + } else if (d1.getBlockCount() > d2.getBlockCount()) { + return -1; + } else return 0; + } + }); + + // iterate over datanodes and return first datanode which is not already scheduled for removal + for (DataNodeBlocks candidate: dataNodeBlocks) { + if (!candidate.isScheduleForRemoval()) { + return candidate; + } + } + + // return null if there is no available candidate + return null; + + } + } class StorageClass { @@ -124,6 +203,8 @@ class StorageClass { this.affinitySets = new ConcurrentHashMap<Integer, DataNodeArray>(); if (CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("roundrobin")){ this.blockSelection = new RoundRobinBlockSelection(); + } else if (CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("sequential")) { + this.blockSelection = new SequentialBlockSelection(); } else { this.blockSelection = new RandomBlockSelection(); } @@ -222,6 +303,33 @@ class StorageClass { //--------------- + public long getTotalBlockCount() { + long capacity = 0; + + for (DataNodeBlocks datanode : membership.values()) { + capacity += datanode.getTotalNumberOfBlocks(); + } + + return capacity; + } + + public long getFreeBlockCount() { + long capacity = 0; + + for (DataNodeBlocks datanode : membership.values()) { + capacity += datanode.getBlockCount(); + } + + return capacity; + } + + public Collection<DataNodeBlocks> getDataNodeBlocks() { + return this.membership.values(); + } + + public int getNumberOfRunningDatanodes() { + return this.membership.size(); + } private void _addDataNode(DataNodeBlocks dataNode){ LOG.info("adding datanode " + CrailUtils.getIPAddressFromBytes(dataNode.getIpAddress()) + ":" + dataNode.getPort() + " of type " + dataNode.getStorageType() + " to storage class " + storageClass); @@ -273,7 +381,18 @@ class StorageClass { public int getNext(int size) { return ThreadLocalRandom.current().nextInt(size); } - } + } + + public class SequentialBlockSelection implements BlockSelection { + public SequentialBlockSelection(){ + LOG.info("sequential block selection"); + } + + @Override + public int getNext(int size) { + return 0; + } + } private class DataNodeArray { private ArrayList<DataNodeBlocks> arrayList; diff --git a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java index 58e4ab0..d3d82ea 100644 --- a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java +++ b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java @@ -86,6 +86,10 @@ public class DataNodeBlocks extends DataNodeInfo { return this.scheduleForRemoval; } + public long getTotalNumberOfBlocks() { + return this.maxBlockCount; + } + public int getBlockCount() { return this.freeBlocks.size(); } diff --git a/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java new file mode 100644 index 0000000..745916c --- /dev/null +++ b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java @@ -0,0 +1,19 @@ +package org.apache.crail.namenode; + +import java.io.IOException; + +import org.apache.crail.conf.CrailConstants; + +public class ElasticNameNodeService extends NameNodeService { + + PolicyRunner policyRunner; + + public ElasticNameNodeService() throws IOException { + + this.policyRunner = new FreeCapacityPolicy(this, + CrailConstants.ELASTICSTORE_SCALEUP, + CrailConstants.ELASTICSTORE_SCALEDOWN, + CrailConstants.ELASTICSTORE_MINNODES, + CrailConstants.ELASTICSTORE_MAXNODES); + } +} diff --git a/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java new file mode 100644 index 0000000..70615ac --- /dev/null +++ b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java @@ -0,0 +1,75 @@ +package org.apache.crail.namenode; + +import org.apache.crail.conf.CrailConstants; +import org.apache.crail.rpc.RpcNameNodeService; + +public class FreeCapacityPolicy extends PolicyRunner { + + double scaleUp; + double scaleDown; + int minDataNodes; + int maxDataNodes; + int datanodes; // maintains the desired number of datanodes (i.e. a datanode might be still starting / terminating) + boolean updated; // shows whether a launch/terminate datanode operation returned + long lastCapacity; // maintains the capacity that was available when the launch/terminate datanode operation was issued + + FreeCapacityPolicy(RpcNameNodeService service, double scaleUp, double scaleDown, int minDataNodes, int maxDataNodes) { + super(service); + this.scaleUp = scaleUp; + this.scaleDown = scaleDown; + this.minDataNodes = minDataNodes; + this.maxDataNodes = maxDataNodes; + this.datanodes = 0; + this.updated = true; + this.lastCapacity = 0; + } + + @Override + public void checkPolicy() { + + try { + + // log current usage information + double usage = this.service.getStorageUsedPercentage(); + + if (CrailConstants.ELASTICSTORE_LOGGING) { + LOG.info("Current block usage: " + this.service.getNumberOfBlocksUsed() + "/" + this.service.getNumberOfBlocks()); + LOG.info("Current storage usage: " + 100*usage + "%"); + LOG.info("Current number of datanodes: " + this.datanodes); + } + + // check whether datanode launch/terminate operation finished + if (!this.updated && this.lastCapacity != this.service.getNumberOfBlocks()) { + this.updated = true; + } + + // check whether scaling up or down is possible + if (this.updated) { + if (usage < scaleDown && this.datanodes > minDataNodes) { + LOG.info("Scale down detected"); + + DataNodeBlocks removeCandidate = this.service.identifyRemoveCandidate(); + + if (removeCandidate != null) { + this.lastCapacity = this.service.getNumberOfBlocks(); + this.updated = false; + this.service.prepareDataNodeForRemoval(removeCandidate); + this.datanodes--; + } + } + + if (usage > this.scaleUp && this.datanodes < maxDataNodes) { + LOG.info("Scale up detected"); + this.lastCapacity = this.service.getNumberOfBlocks(); + this.updated = false; + launchDatanode(); + this.datanodes++; + } + } + + + } catch (Exception e) { + LOG.error("Unable to retrieve storage usage information"); + } + } +} \ No newline at end of file diff --git a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java index a5da526..beab3f1 100644 --- a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java +++ b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java @@ -552,6 +552,26 @@ public class NameNodeService implements RpcNameNodeService, Sequencer { return RpcErrors.ERR_OK; } + public double getStorageUsedPercentage() throws Exception { + return this.blockStore.getStorageUsedPercentage(); + } + + public long getNumberOfBlocksUsed() throws Exception { + return this.blockStore.getNumberOfBlocksUsed(); + } + + public long getNumberOfBlocks() throws Exception { + return this.blockStore.getNumberOfBlocks(); + } + + public int getNumberOfRunningDatanodes() { + return this.blockStore.getNumberOfRunningDatanodes(); + } + + public DataNodeBlocks identifyRemoveCandidate() { + return this.blockStore.identifyRemoveCandidate(); + } + //------------------------ @Override diff --git a/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java new file mode 100644 index 0000000..8ce0ea2 --- /dev/null +++ b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java @@ -0,0 +1,51 @@ +package org.apache.crail.namenode; + +import org.apache.crail.conf.CrailConstants; +import org.apache.crail.rpc.RpcNameNodeService; +import org.apache.crail.utils.CrailUtils; +import org.slf4j.Logger; + +public abstract class PolicyRunner implements Runnable { + + static final Logger LOG = CrailUtils.getLogger(); + NameNodeService service; + int instances = 0; + + PolicyRunner(RpcNameNodeService service){ + this.service = (NameNodeService) service; + Thread runner = new Thread(this); + runner.start(); + } + + public abstract void checkPolicy(); + + public void run() { + + while (true) { + checkPolicy(); + + try { + Thread.sleep(CrailConstants.ELASTICSTORE_POLICYRUNNER_INTERVAL); + } catch(Exception e) { + e.printStackTrace(); + } + } + + } + + public void launchDatanode() { + + try { + String port = Integer.toString(50020+this.instances); + Process p = new ProcessBuilder(System.getenv("CRAIL_HOME") + "/bin/crail", "datanode", "--", "-p" + port).start(); + + LOG.info("Launched new datanode instance"); + this.instances++; + + } catch(Exception e) { + LOG.error("Unable to launch datanode"); + e.printStackTrace(); + } + + } +}