This is an automated email from the ASF dual-hosted git repository.

pepperjo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-crail.git


The following commit(s) were added to refs/heads/master by this push:
     new d07249a  [Resource Elasticity] 3/3 Elastic Namenode Service
d07249a is described below

commit d07249a028818203ef0e07d400c9415f483143de
Author: mbrodmann <mbrodm...@student.ethz.ch>
AuthorDate: Mon Mar 1 10:43:21 2021 +0100

    [Resource Elasticity] 3/3 Elastic Namenode Service
    
    * implement simple autoscaling functionality
    
    * renaming and small changes
    
    * small style changes
---
 .../java/org/apache/crail/conf/CrailConstants.java |  52 +++++++++
 .../java/org/apache/crail/namenode/BlockStore.java | 125 ++++++++++++++++++++-
 .../org/apache/crail/namenode/DataNodeBlocks.java  |   4 +
 .../crail/namenode/ElasticNameNodeService.java     |  19 ++++
 .../apache/crail/namenode/FreeCapacityPolicy.java  |  75 +++++++++++++
 .../org/apache/crail/namenode/NameNodeService.java |  20 ++++
 .../org/apache/crail/namenode/PolicyRunner.java    |  51 +++++++++
 7 files changed, 343 insertions(+), 3 deletions(-)

diff --git a/client/src/main/java/org/apache/crail/conf/CrailConstants.java 
b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
index 310abc1..0ee25eb 100644
--- a/client/src/main/java/org/apache/crail/conf/CrailConstants.java
+++ b/client/src/main/java/org/apache/crail/conf/CrailConstants.java
@@ -118,6 +118,25 @@ public class CrailConstants {
        public static final String STORAGE_KEEPALIVE_KEY = 
"crail.storage.keepalive";
        public static int STORAGE_KEEPALIVE = 2;
 
+       // elasticstore
+       public static final String ELASTICSTORE_SCALEUP_KEY = 
"crail.elasticstore.scaleup";
+       public static double ELASTICSTORE_SCALEUP = 0.4;
+
+       public static final String ELASTICSTORE_SCALEDOWN_KEY = 
"crail.elasticstore.scaledown";
+       public static double ELASTICSTORE_SCALEDOWN = 0.1;
+
+       public static final String ELASTICSTORE_MINNODES_KEY = 
"crail.elasticstore.minnodes";
+       public static int ELASTICSTORE_MINNODES = 1;
+       
+       public static final String ELASTICSTORE_MAXNODES_KEY = 
"crail.elasticstore.maxnodes";
+       public static int ELASTICSTORE_MAXNODES = 10;
+
+       public static final String ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY = 
"crail.elasticstore.policyrunner.interval";
+       public static int ELASTICSTORE_POLICYRUNNER_INTERVAL = 1000;
+
+       public static final String ELASTICSTORE_LOGGING_KEY = 
"crail.elasticstore.logging";
+       public static boolean ELASTICSTORE_LOGGING = false;
+
        public static void updateConstants(CrailConfiguration conf){
                //general
                if (conf.get(DIRECTORY_DEPTH_KEY) != null) {
@@ -191,6 +210,9 @@ public class CrailConstants {
                if (conf.get(NAMENODE_RPC_TYPE_KEY) != null) {
                        NAMENODE_RPC_TYPE = conf.get(NAMENODE_RPC_TYPE_KEY);
                }
+               if (conf.get(NAMENODE_RPC_SERVICE_KEY) != null) {
+                       NAMENODE_RPC_SERVICE = 
conf.get(NAMENODE_RPC_SERVICE_KEY);
+               }
                if (conf.get(NAMENODE_LOG_KEY) != null) {
                        NAMENODE_LOG = conf.get(NAMENODE_LOG_KEY);
                }
@@ -210,6 +232,29 @@ public class CrailConstants {
                if (conf.get(STORAGE_KEEPALIVE_KEY) != null) {
                        STORAGE_KEEPALIVE = 
Integer.parseInt(conf.get(STORAGE_KEEPALIVE_KEY));
                }
+
+               //elasticstore
+               if (conf.get(ELASTICSTORE_SCALEUP_KEY) != null) {
+                       ELASTICSTORE_SCALEUP = 
Double.parseDouble(conf.get(ELASTICSTORE_SCALEUP_KEY));
+               }
+
+               if (conf.get(ELASTICSTORE_SCALEDOWN_KEY) != null) {
+                       ELASTICSTORE_SCALEDOWN = 
Double.parseDouble(conf.get(ELASTICSTORE_SCALEDOWN_KEY));
+               }
+
+               if (conf.get(ELASTICSTORE_MINNODES_KEY) != null) {
+                       ELASTICSTORE_MINNODES = 
Integer.parseInt(conf.get(ELASTICSTORE_MINNODES_KEY));
+               }
+
+               if (conf.get(ELASTICSTORE_MAXNODES_KEY) != null) {
+                       ELASTICSTORE_MAXNODES = 
Integer.parseInt(conf.get(ELASTICSTORE_MAXNODES_KEY));
+               }
+               if (conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY) != null) {
+                       ELASTICSTORE_POLICYRUNNER_INTERVAL = 
Integer.parseInt(conf.get(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY));
+               }
+               if (conf.get(ELASTICSTORE_LOGGING_KEY) != null) {
+                       ELASTICSTORE_LOGGING = 
Boolean.parseBoolean(conf.get(ELASTICSTORE_LOGGING_KEY));
+               }
        }
 
        public static void printConf(){
@@ -237,11 +282,18 @@ public class CrailConstants {
                LOG.info(NAMENODE_BLOCKSELECTION_KEY + " " + 
NAMENODE_BLOCKSELECTION);
                LOG.info(NAMENODE_FILEBLOCKS_KEY + " " + NAMENODE_FILEBLOCKS);
                LOG.info(NAMENODE_RPC_TYPE_KEY + " " + NAMENODE_RPC_TYPE);
+               LOG.info(NAMENODE_RPC_SERVICE_KEY + " " + NAMENODE_RPC_SERVICE);
                LOG.info(NAMENODE_LOG_KEY + " " + NAMENODE_LOG);
                LOG.info(STORAGE_TYPES_KEY + " " + STORAGE_TYPES);
                LOG.info(STORAGE_CLASSES_KEY + " " + STORAGE_CLASSES);
                LOG.info(STORAGE_ROOTCLASS_KEY + " " + STORAGE_ROOTCLASS);
                LOG.info(STORAGE_KEEPALIVE_KEY + " " + STORAGE_KEEPALIVE);
+               LOG.info(ELASTICSTORE_SCALEUP_KEY + " " + ELASTICSTORE_SCALEUP);
+               LOG.info(ELASTICSTORE_SCALEDOWN_KEY + " " + 
ELASTICSTORE_SCALEDOWN);
+               LOG.info(ELASTICSTORE_MAXNODES_KEY + " " + 
ELASTICSTORE_MAXNODES);
+               LOG.info(ELASTICSTORE_MINNODES_KEY + " " + 
ELASTICSTORE_MINNODES);
+               LOG.info(ELASTICSTORE_POLICYRUNNER_INTERVAL_KEY + " " + 
ELASTICSTORE_POLICYRUNNER_INTERVAL);
+               LOG.info(ELASTICSTORE_LOGGING_KEY + " " + ELASTICSTORE_LOGGING);
        }
 
        public static void verify() throws IOException {
diff --git a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java 
b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
index fa32ffb..7c616a2 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/BlockStore.java
@@ -20,7 +20,7 @@ package org.apache.crail.namenode;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -97,7 +97,7 @@ public class BlockStore {
                // nevertheless target only one running datanode instance.
                // Therefore we can iterate over all storageClasses to check 
whether
                // the requested datanode is part of one of the storageClasses.
-               for(StorageClass storageClass : storageClasses) {
+               for (StorageClass storageClass : storageClasses) {
                        if (storageClass.getDataNode(dn) != null) {
                                return 
storageClass.prepareForRemovalDatanode(dn);
                        }
@@ -107,6 +107,85 @@ public class BlockStore {
                return RpcErrors.ERR_DATANODE_NOT_REGISTERED;
        }
 
+       public double getStorageUsedPercentage() throws Exception {
+               long total = 0;
+               long free = 0;
+               for (StorageClass storageClass : storageClasses) {
+                       total += storageClass.getTotalBlockCount();
+                       free += storageClass.getFreeBlockCount();
+               }
+
+               // if there is no available capacity (i.e. total number of 
available blocks is 0),
+               // return 1.0 which tells that all storage is used
+               if (total != 0) {
+                       double available = (double) free / (double) total;
+                       return 1.0 - available;
+               } else {
+                       return 1.0;
+               }
+
+       }
+
+       public long getNumberOfBlocksUsed() throws Exception {
+               int total = 0;
+
+               for (StorageClass storageClass: storageClasses) {
+                       total += (storageClass.getTotalBlockCount() - 
storageClass.getFreeBlockCount());
+               }
+
+               return total;
+       }
+
+       public long getNumberOfBlocks() throws Exception {
+               int total = 0;
+
+               for (StorageClass storageClass: storageClasses) {
+                       total += storageClass.getTotalBlockCount();
+               }
+
+               return total;
+       }
+
+       public int getNumberOfRunningDatanodes() {
+               int total = 0;
+
+               for (StorageClass storageClass : storageClasses) {
+                       total += storageClass.getNumberOfRunningDatanodes();
+               }
+
+               return total;
+       }
+
+       public DataNodeBlocks identifyRemoveCandidate() {
+
+               ArrayList<DataNodeBlocks> dataNodeBlocks = new 
ArrayList<DataNodeBlocks>();
+               for (StorageClass storageClass : storageClasses) {
+                       dataNodeBlocks.addAll(storageClass.getDataNodeBlocks());
+               }
+
+               // sort all datanodes by increasing numbers of available 
datablocks
+               Collections.sort(dataNodeBlocks, new 
Comparator<DataNodeBlocks>() {
+                       public int compare(DataNodeBlocks d1, DataNodeBlocks 
d2) {
+                               if (d1.getBlockCount() < d2.getBlockCount()) {
+                                       return 1;
+                               } else if (d1.getBlockCount() > 
d2.getBlockCount()) {
+                                       return -1;
+                               } else return 0;
+                       }
+               });
+
+               // iterate over datanodes and return first datanode which is 
not already scheduled for removal
+               for (DataNodeBlocks candidate: dataNodeBlocks) {
+                       if (!candidate.isScheduleForRemoval()) {
+                               return candidate;
+                       }
+               }
+
+               // return null if there is no available candidate
+               return null;
+
+       }
+
 }
 
 class StorageClass {
@@ -124,6 +203,8 @@ class StorageClass {
                this.affinitySets = new ConcurrentHashMap<Integer, 
DataNodeArray>();
                if 
(CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("roundrobin")){
                        this.blockSelection = new RoundRobinBlockSelection();
+               } else if 
(CrailConstants.NAMENODE_BLOCKSELECTION.equalsIgnoreCase("sequential")) {
+                       this.blockSelection = new SequentialBlockSelection();
                } else {
                        this.blockSelection = new RandomBlockSelection();
                }
@@ -222,6 +303,33 @@ class StorageClass {
 
        //---------------
 
+       public long getTotalBlockCount() {
+               long capacity = 0;
+
+               for (DataNodeBlocks datanode : membership.values()) {
+                       capacity += datanode.getTotalNumberOfBlocks();
+               }
+
+               return capacity;
+       }
+
+       public long getFreeBlockCount() {
+               long capacity = 0;
+
+               for (DataNodeBlocks datanode : membership.values()) {
+                       capacity += datanode.getBlockCount();
+               }
+
+               return capacity;
+       }
+
+       public Collection<DataNodeBlocks> getDataNodeBlocks() {
+               return this.membership.values();
+       }
+
+       public int getNumberOfRunningDatanodes() {
+               return this.membership.size();
+       }
 
        private void _addDataNode(DataNodeBlocks dataNode){
                LOG.info("adding datanode " + 
CrailUtils.getIPAddressFromBytes(dataNode.getIpAddress()) + ":" + 
dataNode.getPort() + " of type " + dataNode.getStorageType() + " to storage 
class " + storageClass);
@@ -273,7 +381,18 @@ class StorageClass {
                public int getNext(int size) {
                        return ThreadLocalRandom.current().nextInt(size);
                }
-       }       
+       }
+       
+       public class SequentialBlockSelection implements BlockSelection {
+               public SequentialBlockSelection(){
+                       LOG.info("sequential block selection");
+               }
+
+               @Override
+               public int getNext(int size) {
+                       return 0;
+               }
+       }
        
        private class DataNodeArray {
                private ArrayList<DataNodeBlocks> arrayList;
diff --git 
a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java 
b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
index 58e4ab0..d3d82ea 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/DataNodeBlocks.java
@@ -86,6 +86,10 @@ public class DataNodeBlocks extends DataNodeInfo {
                return this.scheduleForRemoval;
        }
 
+       public long getTotalNumberOfBlocks() {
+               return this.maxBlockCount;
+       }
+
        public int getBlockCount() {
                return this.freeBlocks.size();
        }
diff --git 
a/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java 
b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java
new file mode 100644
index 0000000..745916c
--- /dev/null
+++ 
b/namenode/src/main/java/org/apache/crail/namenode/ElasticNameNodeService.java
@@ -0,0 +1,19 @@
+package org.apache.crail.namenode;
+
+import java.io.IOException;
+
+import org.apache.crail.conf.CrailConstants;
+
+public class ElasticNameNodeService extends NameNodeService {
+
+    PolicyRunner policyRunner;
+
+    public ElasticNameNodeService() throws IOException {
+
+        this.policyRunner = new FreeCapacityPolicy(this,
+                CrailConstants.ELASTICSTORE_SCALEUP,
+                CrailConstants.ELASTICSTORE_SCALEDOWN,
+                CrailConstants.ELASTICSTORE_MINNODES,
+                CrailConstants.ELASTICSTORE_MAXNODES);
+    }
+}
diff --git 
a/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java 
b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java
new file mode 100644
index 0000000..70615ac
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/FreeCapacityPolicy.java
@@ -0,0 +1,75 @@
+package org.apache.crail.namenode;
+
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.RpcNameNodeService;
+
+public class FreeCapacityPolicy extends PolicyRunner {
+
+    double scaleUp;
+    double scaleDown;
+    int minDataNodes;
+    int maxDataNodes;
+    int datanodes; // maintains the desired number of datanodes (i.e. a 
datanode might be still starting / terminating)
+    boolean updated; // shows whether a launch/terminate datanode operation 
returned
+    long lastCapacity; // maintains the capacity that was available when the 
launch/terminate datanode operation was issued
+
+    FreeCapacityPolicy(RpcNameNodeService service, double scaleUp, double 
scaleDown, int minDataNodes, int maxDataNodes) {
+        super(service);
+        this.scaleUp = scaleUp;
+        this.scaleDown = scaleDown;
+        this.minDataNodes = minDataNodes;
+        this.maxDataNodes = maxDataNodes;
+        this.datanodes = 0;
+        this.updated = true;
+        this.lastCapacity = 0;
+    }
+
+    @Override
+    public void checkPolicy() {
+
+        try {
+
+            // log current usage information
+            double usage = this.service.getStorageUsedPercentage();
+
+            if (CrailConstants.ELASTICSTORE_LOGGING) {
+                LOG.info("Current block usage: " + 
this.service.getNumberOfBlocksUsed() + "/" + this.service.getNumberOfBlocks());
+                LOG.info("Current storage usage: " + 100*usage + "%");
+                LOG.info("Current number of datanodes: " + this.datanodes);
+            }
+
+            // check whether datanode launch/terminate operation finished
+            if (!this.updated && this.lastCapacity != 
this.service.getNumberOfBlocks()) {
+                this.updated = true;
+            }
+
+            // check whether scaling up or down is possible
+            if (this.updated) {
+                if (usage < scaleDown && this.datanodes > minDataNodes) {
+                    LOG.info("Scale down detected");
+    
+                    DataNodeBlocks removeCandidate = 
this.service.identifyRemoveCandidate();
+    
+                    if (removeCandidate != null) {
+                        this.lastCapacity = this.service.getNumberOfBlocks();
+                        this.updated = false;
+                        
this.service.prepareDataNodeForRemoval(removeCandidate);
+                        this.datanodes--;
+                    }
+                }
+    
+                if (usage > this.scaleUp && this.datanodes < maxDataNodes) {
+                    LOG.info("Scale up detected");
+                    this.lastCapacity = this.service.getNumberOfBlocks();
+                    this.updated = false;
+                    launchDatanode();
+                    this.datanodes++;
+                }
+            }
+
+
+        } catch (Exception e) {
+            LOG.error("Unable to retrieve storage usage information");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java 
b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
index a5da526..beab3f1 100644
--- a/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
+++ b/namenode/src/main/java/org/apache/crail/namenode/NameNodeService.java
@@ -552,6 +552,26 @@ public class NameNodeService implements 
RpcNameNodeService, Sequencer {
                return RpcErrors.ERR_OK;
        }
 
+       public double getStorageUsedPercentage() throws Exception {
+               return this.blockStore.getStorageUsedPercentage();
+       }
+
+       public long getNumberOfBlocksUsed() throws Exception {
+               return this.blockStore.getNumberOfBlocksUsed();
+       }
+
+       public long getNumberOfBlocks() throws Exception {
+               return this.blockStore.getNumberOfBlocks();
+       }
+
+       public int getNumberOfRunningDatanodes() {
+               return this.blockStore.getNumberOfRunningDatanodes();
+       }
+
+       public DataNodeBlocks identifyRemoveCandidate() {
+               return  this.blockStore.identifyRemoveCandidate();
+       }
+
        //------------------------
        
        @Override
diff --git a/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java 
b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java
new file mode 100644
index 0000000..8ce0ea2
--- /dev/null
+++ b/namenode/src/main/java/org/apache/crail/namenode/PolicyRunner.java
@@ -0,0 +1,51 @@
+package org.apache.crail.namenode;
+
+import org.apache.crail.conf.CrailConstants;
+import org.apache.crail.rpc.RpcNameNodeService;
+import org.apache.crail.utils.CrailUtils;
+import org.slf4j.Logger;
+
+public abstract class PolicyRunner implements Runnable {
+
+    static final Logger LOG = CrailUtils.getLogger();
+    NameNodeService service;
+    int instances = 0;
+
+    PolicyRunner(RpcNameNodeService service){
+        this.service = (NameNodeService) service;
+        Thread runner = new Thread(this);
+        runner.start();
+    }
+
+    public abstract void checkPolicy();
+
+    public void run() {
+
+        while (true) {
+            checkPolicy();
+
+            try {
+                
Thread.sleep(CrailConstants.ELASTICSTORE_POLICYRUNNER_INTERVAL);
+            } catch(Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    public void launchDatanode() {
+
+        try {
+            String port = Integer.toString(50020+this.instances);
+            Process p = new ProcessBuilder(System.getenv("CRAIL_HOME") + 
"/bin/crail", "datanode", "--",  "-p" + port).start();
+
+            LOG.info("Launched new datanode instance");
+            this.instances++;
+
+        } catch(Exception e) {
+            LOG.error("Unable to launch datanode");
+            e.printStackTrace();
+        }
+
+    }
+}

Reply via email to