Hi all,

I saw a comment in nodemanagerservice.py about moving host registering into 
registerWithClusterManager function. I considered this when I was coding host 
registration, but then I decided to add a registerHost function, because 
registerWitClusterManager function is running in a loop and registering is 
needed only when starting node manager. 

But yes, probably it makes sense to implement host registration inside 
registerWithClusterManager function (and insert it before loop). I have tested 
the code below on our cluster and it works (I removed registerHost function and 
self.registerHost call from nodemanagerservice.py).

BTW: Why are you using some default values zero for registration - getHostInfo 
can be used to retrieve host information and then these values can be added to 
the backend storage?

def registerWithClusterManager(self):
                cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
                now = datetime.datetime.now()           
                version = now.strftime("%Y-%m-%d")
                host = self.vmm.getHostInfo(self)
                cm.registerHost(host.name, host.memory, host.cores,  version)
                while True:
                        start = time.time()
                        try:
                                host = self.vmm.getHostInfo(self)
                                instances = self.instances.values()
                                self.id = cm.registerNodeManager(host, 
instances)
                        except Exception, e:
                                self.log.exception('Failed to register with the 
CM')
                        toSleep = start - time.time() + self.registerFrequency
                        if (toSleep > 0):
                                time.sleep(toSleep)


There is one more thing - in the s...@soi project [1][2] we are using Tashi and 
we have added some functionalities to Tashi which are required by s...@soi 
framework. You can find some basic info on [3]. We have also published s...@soi 
modified Tashi code [4] to provide s...@soi users all required components for 
running s...@soi framework. 

In case that you find some functionalities as interesting, we will be happy to 
make patches and help you merge them into Tashi.

Best regards,
Miha

[1] http://sourceforge.net/projects/sla-at-soi/
[2] http://sourceforge.net/apps/trac/sla-at-soi/
[3] http://sourceforge.net/apps/trac/sla-at-soi/wiki/SlasoiTashi
[4] 
https://sla-at-soi.svn.sourceforge.net/svnroot/sla-at-soi/trunk/infrastructure-servicemanager/tashi/


----- Original Message -----
From: [email protected]
To: [email protected]
Sent: Thursday, December 9, 2010 3:41:36 AM
Subject: svn commit: r1043823 - in /incubator/tashi/trunk/src/tashi: client/ 
clustermanager/ clustermanager/data/ nodemanager/

Author: stroucki
Date: Thu Dec  9 02:41:36 2010
New Revision: 1043823

URL: http://svn.apache.org/viewvc?rev=1043823&view=rev
Log:
=== Changes from Michael Stroucken
- nodemanager.py
        - get fresh instance data (per Miha Stopar)

=== Changes from Andrew Edmonds and Miha Stopar
- sql.py:
        - self.idLock = threading.Lock() added into __init__
        - registerHost, unregisterHost, getNewId functions added

- datainterface.py:
        - registerHost, unregisterHost added

- fromconfig.py:
        - import os, import ConfigParser
        - the following section added into __init__:
                self.hostLocks = {}
                self.hostLock = threading.Lock()
                self.idLock = threading.Lock()
                if not self.config.has_section("FromConfig"):
                        return
        - the following line fixed - instance.id instead of instanceId:
                raise 
TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - 
%d" % (instance.id)})
        - acquireHost was changed:
                at the beginning the following line was added:
                        self.hostLock.acquire()
                before acquireLock call the following section was added:
                        # hostLocks dict added when registerHost was 
implemented, otherwise newly added hosts don't have _lock
                        self.hostLocks[hostId] = self.hostLocks.get(hostId, 
threading.Lock())
                        host._lock = self.hostLocks[host.id]
        - releaseHost was changed:
                - hostId into host.id (see raise TashiException)
                - self.save() and self.hostLock.release() calls added

        - getHosts was changed:
                - now return self.hosts() instead of return self.cleanHosts() 
(FromConfig does not have cleanHosts method, it is available only when Pickled 
overrides it)
        - registerHost, unregisterHost, getNewId, save functions added

- pickled.py:
        - the following section added to the __init__:
                self.hostLock = threading.Lock()
                self.hostLocks = {}
                self.idLock = threading.Lock()

- getentoverride.py: registerHost, unregisterHost added
- ldapoverride.py: registerHost, unregisterHost added

- clustermanagerservice.py: registerHost, unregisterHost added

- nodemanagerservice.py:
        - self.registerHost() call added into __init__ (before threads)
        - registerHost added

- tashi-client.py: unregisterHost added to the argLists dict, convertArgs dict, 
description dict and to the examples dict

- rpycservices.py: added registerHost and unregisterHost to the 
clusterManagerRPCs list

Modified:
    incubator/tashi/trunk/src/tashi/client/tashi-client.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py

Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Thu Dec  9 02:41:36 
2010
@@ -214,6 +214,7 @@ argLists = {
 'getMyInstances': [],
 'getVmLayout': [],
 'vmmSpecificCall': [('instance', checkIid, lambda: requiredArg('instance'), 
True), ('arg', str, lambda: requiredArg('arg'), True)],
+'unregisterHost': [('hostId', int, lambda: requiredArg('hostId'), True)],
 }
 
 # Used to convert the dictionary built from the arguments into an object that 
can be used by thrift
@@ -229,6 +230,7 @@ convertArgs = {
 'pauseVm': '[instance]',
 'unpauseVm': '[instance]',
 'vmmSpecificCall': '[instance, arg]',
+'unregisterHost' : '[hostId]',
 }
 
 # Descriptions
@@ -249,7 +251,8 @@ description = {
 'getInstances': 'Gets a list of all VMs in Tashi',
 'getMyInstances': 'Utility function that only lists VMs owned by the current 
user',
 'getVmLayout': 'Utility function that displays what VMs are placed on what 
hosts',
-'vmmSpecificCall': 'Direct access to VMM-specific functionality'
+'vmmSpecificCall': 'Direct access to VMM-specific functionality',
+'unregisterHost' : 'Unregisters host. Registration happens when starting node 
manager',
 }
 
 # Example use strings
@@ -270,7 +273,8 @@ examples = {
 'getInstances': [''],
 'getMyInstances': [''],
 'getVmLayout': [''],
-'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar 
--arg stopVnc']
+'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar 
--arg stopVnc'],
+'unregisterHost' : ['--hostId 2'],
 }
 
 show_hide = []

Modified: 
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py 
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Thu 
Dec  9 02:41:36 2010
@@ -254,7 +254,7 @@ class ClusterManagerService(object):
                try:
                        # Prepare the target
                        self.log.info("migrateVm: Calling prepSourceVm on 
source host %s" % sourceHost.name)
-                       self.proxy[sourceHost.name].prepSourceVm(instance)
+                       self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
                        self.log.info("migrateVm: Calling prepReceiveVm on 
target host %s" % targetHost.name)
                        cookie = 
self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
                except Exception, e:
@@ -488,3 +488,16 @@ class ClusterManagerService(object):
                                self.stateTransition(instance, 
InstanceState.Activating, InstanceState.Running)
                        self.data.releaseInstance(instance)
                return
+
+        def registerHost(self, hostname, memory, cores, version):
+                hostId, alreadyRegistered = self.data.registerHost(hostname, 
memory, cores, version)
+                if alreadyRegistered:
+                        self.log.info("Host %s is already registered, it was 
updated now" % hostname)
+                else:
+                        self.log.info("A host was registered - hostname: %s, 
version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+                return hostId
+
+        def unregisterHost(self, hostId):
+                self.data.unregisterHost(hostId)
+                self.log.info("Host %s was unregistered" % hostId)
+                return

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py 
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py Thu 
Dec  9 02:41:36 2010
@@ -63,3 +63,9 @@ class DataInterface(object):
        
        def getUser(self, id):
                raise NotImplementedError
+               
+       def registerHost(self, hostname, memory, cores, version):
+               raise NotImplementedError
+       
+       def unregisterHost(self, hostId):
+               raise NotImplementedError

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Thu Dec  
9 02:41:36 2010
@@ -16,6 +16,8 @@
 # under the License.    
 
 import threading
+import os
+import ConfigParser
 
 from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data import DataInterface
@@ -34,6 +36,11 @@ class FromConfig(DataInterface):
                self.instanceIdLock = threading.Lock()
                self.lockNames[self.instanceIdLock] = "instanceIdLock"
                self.maxInstanceId = 1
+               self.hostLocks = {}
+               self.hostLock = threading.Lock()
+               self.idLock = threading.Lock()
+               if not self.config.has_section("FromConfig"):
+                       return
                for (name, value) in self.config.items("FromConfig"):
                        name = name.lower()
                        if (name.startswith("host")):
@@ -114,21 +121,28 @@ class FromConfig(DataInterface):
                        self.releaseLock(self.instanceLock)
        
        def acquireHost(self, hostId):
+               self.hostLock.acquire()
                host = self.hosts.get(hostId, None)
                if (host is None):
                        raise 
TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % 
(hostId)})
+               # hostLocks dict added when registerHost was implemented, 
otherwise newly added hosts don't have _lock 
+               self.hostLocks[hostId] = self.hostLocks.get(hostId, 
threading.Lock())
+               host._lock = self.hostLocks[host.id]
                self.acquireLock(host._lock)
                return host
+
        
        def releaseHost(self, host):
                try:
                        if (host.id not in self.hosts): # MPR: should never be 
true, but good to check
-                               raise 
TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % 
(hostId)})
+                               raise 
TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % 
(host.id)})
                finally:
+                       self.save()
                        self.releaseLock(host._lock)
+                       self.hostLock.release()
        
        def getHosts(self):
-               return self.cleanHosts()
+               return self.hosts
        
        def getHost(self, id):
                host = self.hosts.get(id, None)
@@ -156,3 +170,81 @@ class FromConfig(DataInterface):
        
        def getUser(self, id):
                return self.users[id]
+               
+       def registerHost(self, hostname, memory, cores, version):
+               self.hostLock.acquire()
+               for id in self.hosts.keys():
+                       if self.hosts[id].name == hostname:
+                               host = 
Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+                               self.hosts[id] = host
+                               self.save()
+                               self.hostLock.release()
+                               return id, True
+               id = self.getNewId("hosts")
+               self.hosts[id] = 
Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+               self.save()
+               self.hostLock.release()
+               return id, False
+               
+       def unregisterHost(self, hostId):
+               self.hostLock.acquire()
+               del self.hosts[hostId]
+               self.save()
+               self.hostLock.release()
+
+       def getNewId(self, table):
+               """ Generates id for a new object. For example for hosts and 
users.  
+               """
+               self.idLock.acquire()
+               maxId = 0
+               l = []
+               if(table == "hosts"):
+                       for id in self.hosts.keys():
+                               l.append(id)
+                               if id >= maxId:
+                                       maxId = id
+               l.sort() # sort to enable comparing with range output
+               # check if some id is released:
+               t = range(maxId + 1)
+               t.remove(0)
+               if l != t and l != []:
+                       releasedIds = filter(lambda x : x not in l, t)
+                       self.idLock.release()
+                       return releasedIds[0]
+               else:
+                       self.idLock.release()
+                       return maxId + 1
+               
+       def save(self):
+               # XXXstroucki: a relative path? Where does it go
+               # and in what order does it get loaded
+               fileName = "./etc/Tashi.cfg"
+               if not os.path.exists(fileName):
+                       file = open(fileName, "w")
+                       file.write("[FromConfig]")
+                       file.close()    
+               parser = ConfigParser.ConfigParser()
+               parser.read(fileName)
+               
+               if not parser.has_section("FromConfig"):
+                       parser.add_section("FromConfig")
+               
+               hostsInFile = []
+               for (name, value) in parser.items("FromConfig"):
+                       name = name.lower()
+                       if (name.startswith("host")):
+                               hostsInFile.append(name)
+                               
+               for h in hostsInFile:
+                       parser.remove_option("FromConfig", h)
+                       
+               for hId in self.hosts.keys():
+                       host = self.hosts[hId]
+                       hostPresentation = 
"Host(d={'id':%s,'name':'%s','state':HostState.Normal,'memory':%s,'cores':%s,'version':'%s'})"
 % (hId, host.name, host.memory, host.cores, host.version)
+                       parser.set("FromConfig", "host%s" % hId, 
hostPresentation)
+               
+               with open(fileName, 'wb') as configfile:
+                       parser.write(configfile)
+               
+               
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py 
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py Thu 
Dec  9 02:41:36 2010
@@ -91,3 +91,10 @@ class GetentOverride(DataInterface):
        def getUser(self, id):
                self.fetchFromGetent()
                return self.users[id]
+               
+       def registerHost(self, hostname, memory, cores, version):
+               return self.baseDataObject.registerHost(hostname, memory, 
cores, version)
+       
+       def unregisterHost(self, hostId):
+               return self.baseDataObject.unregisterHost(hostId)
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py 
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py Thu Dec 
 9 02:41:36 2010
@@ -104,3 +104,10 @@ class LdapOverride(DataInterface):
        def getUser(self, id):
                self.fetchFromLdap()
                return self.users[id]
+               
+       def registerHost(self, hostname, memory, cores, version):
+               return self.baseDataObject.registerHost(hostname, memory, 
cores, version)
+       
+       def unregisterHost(self, hostId):
+               return self.baseDataObject.unregisterHost(hostId)
+

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py Thu Dec  9 
02:41:36 2010
@@ -32,6 +32,9 @@ class Pickled(FromConfig):
                self.instanceIdLock = threading.Lock()
                self.lockNames[self.instanceIdLock] = "instanceIdLock"
                self.maxInstanceId = 1
+               self.hostLock = threading.Lock()
+               self.hostLocks = {}
+               self.idLock = threading.Lock()
                self.load()
        
        def cleanInstances(self):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Thu Dec  9 
02:41:36 2010
@@ -50,6 +50,7 @@ class SQL(DataInterface):
                self.hostLock = threading.Lock()
                self.hostLocks = {}
                self.maxInstanceId = 1
+               self.idLock = threading.Lock()
                self.sqlLock = threading.Lock()
                self.verifyStructure()
 
@@ -258,3 +259,62 @@ class SQL(DataInterface):
                r = cur.fetchone()
                user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
                return user
+               
+       def registerHost(self, hostname, memory, cores, version):
+               self.hostLock.acquire()
+               cur = self.executeStatement("SELECT * from hosts")
+               res = cur.fetchall()
+               for r in res:
+                       if r[1] == hostname:
+                               id = r[0]
+                               print "Host %s already registered, update will 
be done" % id
+                               s = ""
+                               host = Host(d={'id': id, 'up': 0, 'decayed': 0, 
'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 
'version':version})
+                               l = self.makeHostList(host)
+                               for e in range(0, len(self.hostOrder)):
+                                       s = s + self.hostOrder[e] + "=" + l[e]
+                                       if (e < len(self.hostOrder)-1):
+                                               s = s + ", "
+                               self.executeStatement("UPDATE hosts SET %s 
WHERE id = %d" % (s, id))
+                               self.hostLock.release()
+                               return r[0], True
+               id = self.getNewId("hosts")
+               host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 
'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
+               l = self.makeHostList(host)
+               self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, 
%s, %s, %s, %s, %s)" % tuple(l))
+               self.hostLock.release()
+               return id, False
+       
+       def unregisterHost(self, hostId):
+               self.hostLock.acquire()
+               cur = self.executeStatement("SELECT * from hosts")
+               res = cur.fetchall()
+               for r in res:
+                       if r[0] == hostId:
+                               self.executeStatement("DELETE FROM hosts WHERE 
id = %d" % hostId)
+               self.hostLock.release()
+
+       def getNewId(self, table):
+               """ Generates id for a new object. For example for hosts and 
users.  
+               """
+               self.idLock.acquire()
+               cur = self.executeStatement("SELECT * from %s" % table)
+               res = cur.fetchall()
+               maxId = 0 # the first id would be 1
+               l = []
+               for r in res:
+                       id = r[0]
+                       l.append(id)
+                       if id >= maxId:
+                               maxId = id
+               l.sort() # sort to enable comparing with range output
+               # check if some id is released:
+               t = range(maxId + 1)
+               t.remove(0)
+               if l != t and l != []:
+                       releasedIds = filter(lambda x : x not in l, t)
+                       self.idLock.release()
+                       return releasedIds[0]
+               else:
+                       self.idLock.release()
+                       return maxId + 1

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: 
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Thu Dec  
9 02:41:36 2010
@@ -64,6 +64,7 @@ class NodeManagerService(object):
                        if (vmId not in vmList):
                                self.log.warning('vmcontrol backend does not 
report %d' % (vmId))
                                self.vmStateChange(vmId, None, 
InstanceState.Exited)
+               self.registerHost()
                
threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
                threading.Thread(target=self.registerWithClusterManager).start()
                threading.Thread(target=self.statsThread).start()
@@ -202,7 +203,8 @@ class NodeManagerService(object):
                transportCookie = self.vmm.prepReceiveVm(instance, source.name)
                return transportCookie
 
-       def prepSourceVm(self, instance):
+       def prepSourceVm(self, vmId):
+               instance = self.getInstance(vmId)
                instance.state = InstanceState.MigratePrep
        
        def migrateVmHelper(self, instance, target, transportCookie):
@@ -291,3 +293,13 @@ class NodeManagerService(object):
                        except:
                                self.log.exception('statsThread threw an 
exception')
                        time.sleep(self.statsInterval)
+
+        def registerHost(self):
+                cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
+                hostname = socket.gethostname()
+               # populate some defaults
+               # XXXstroucki: I think it's better if the nodemanager fills 
these in properly when registering with the clustermanager
+               memory = 0
+               cores = 0
+               version = "empty"
+                cm.registerHost(hostname, memory, cores, version)


Reply via email to