Hi all,
I saw a comment in nodemanagerservice.py about moving host registering into
registerWithClusterManager function. I considered this when I was coding host
registration, but then I decided to add a registerHost function, because
registerWitClusterManager function is running in a loop and registering is
needed only when starting node manager.
But yes, probably it makes sense to implement host registration inside
registerWithClusterManager function (and insert it before loop). I have tested
the code below on our cluster and it works (I removed registerHost function and
self.registerHost call from nodemanagerservice.py).
BTW: Why are you using some default values zero for registration - getHostInfo
can be used to retrieve host information and then these values can be added to
the backend storage?
def registerWithClusterManager(self):
cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
now = datetime.datetime.now()
version = now.strftime("%Y-%m-%d")
host = self.vmm.getHostInfo(self)
cm.registerHost(host.name, host.memory, host.cores, version)
while True:
start = time.time()
try:
host = self.vmm.getHostInfo(self)
instances = self.instances.values()
self.id = cm.registerNodeManager(host,
instances)
except Exception, e:
self.log.exception('Failed to register with the
CM')
toSleep = start - time.time() + self.registerFrequency
if (toSleep > 0):
time.sleep(toSleep)
There is one more thing - in the s...@soi project [1][2] we are using Tashi and
we have added some functionalities to Tashi which are required by s...@soi
framework. You can find some basic info on [3]. We have also published s...@soi
modified Tashi code [4] to provide s...@soi users all required components for
running s...@soi framework.
In case that you find some functionalities as interesting, we will be happy to
make patches and help you merge them into Tashi.
Best regards,
Miha
[1] http://sourceforge.net/projects/sla-at-soi/
[2] http://sourceforge.net/apps/trac/sla-at-soi/
[3] http://sourceforge.net/apps/trac/sla-at-soi/wiki/SlasoiTashi
[4]
https://sla-at-soi.svn.sourceforge.net/svnroot/sla-at-soi/trunk/infrastructure-servicemanager/tashi/
----- Original Message -----
From: [email protected]
To: [email protected]
Sent: Thursday, December 9, 2010 3:41:36 AM
Subject: svn commit: r1043823 - in /incubator/tashi/trunk/src/tashi: client/
clustermanager/ clustermanager/data/ nodemanager/
Author: stroucki
Date: Thu Dec 9 02:41:36 2010
New Revision: 1043823
URL: http://svn.apache.org/viewvc?rev=1043823&view=rev
Log:
=== Changes from Michael Stroucken
- nodemanager.py
- get fresh instance data (per Miha Stopar)
=== Changes from Andrew Edmonds and Miha Stopar
- sql.py:
- self.idLock = threading.Lock() added into __init__
- registerHost, unregisterHost, getNewId functions added
- datainterface.py:
- registerHost, unregisterHost added
- fromconfig.py:
- import os, import ConfigParser
- the following section added into __init__:
self.hostLocks = {}
self.hostLock = threading.Lock()
self.idLock = threading.Lock()
if not self.config.has_section("FromConfig"):
return
- the following line fixed - instance.id instead of instanceId:
raise
TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId -
%d" % (instance.id)})
- acquireHost was changed:
at the beginning the following line was added:
self.hostLock.acquire()
before acquireLock call the following section was added:
# hostLocks dict added when registerHost was
implemented, otherwise newly added hosts don't have _lock
self.hostLocks[hostId] = self.hostLocks.get(hostId,
threading.Lock())
host._lock = self.hostLocks[host.id]
- releaseHost was changed:
- hostId into host.id (see raise TashiException)
- self.save() and self.hostLock.release() calls added
- getHosts was changed:
- now return self.hosts() instead of return self.cleanHosts()
(FromConfig does not have cleanHosts method, it is available only when Pickled
overrides it)
- registerHost, unregisterHost, getNewId, save functions added
- pickled.py:
- the following section added to the __init__:
self.hostLock = threading.Lock()
self.hostLocks = {}
self.idLock = threading.Lock()
- getentoverride.py: registerHost, unregisterHost added
- ldapoverride.py: registerHost, unregisterHost added
- clustermanagerservice.py: registerHost, unregisterHost added
- nodemanagerservice.py:
- self.registerHost() call added into __init__ (before threads)
- registerHost added
- tashi-client.py: unregisterHost added to the argLists dict, convertArgs dict,
description dict and to the examples dict
- rpycservices.py: added registerHost and unregisterHost to the
clusterManagerRPCs list
Modified:
incubator/tashi/trunk/src/tashi/client/tashi-client.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Thu Dec 9 02:41:36
2010
@@ -214,6 +214,7 @@ argLists = {
'getMyInstances': [],
'getVmLayout': [],
'vmmSpecificCall': [('instance', checkIid, lambda: requiredArg('instance'),
True), ('arg', str, lambda: requiredArg('arg'), True)],
+'unregisterHost': [('hostId', int, lambda: requiredArg('hostId'), True)],
}
# Used to convert the dictionary built from the arguments into an object that
can be used by thrift
@@ -229,6 +230,7 @@ convertArgs = {
'pauseVm': '[instance]',
'unpauseVm': '[instance]',
'vmmSpecificCall': '[instance, arg]',
+'unregisterHost' : '[hostId]',
}
# Descriptions
@@ -249,7 +251,8 @@ description = {
'getInstances': 'Gets a list of all VMs in Tashi',
'getMyInstances': 'Utility function that only lists VMs owned by the current
user',
'getVmLayout': 'Utility function that displays what VMs are placed on what
hosts',
-'vmmSpecificCall': 'Direct access to VMM-specific functionality'
+'vmmSpecificCall': 'Direct access to VMM-specific functionality',
+'unregisterHost' : 'Unregisters host. Registration happens when starting node
manager',
}
# Example use strings
@@ -270,7 +273,8 @@ examples = {
'getInstances': [''],
'getMyInstances': [''],
'getVmLayout': [''],
-'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar
--arg stopVnc']
+'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar
--arg stopVnc'],
+'unregisterHost' : ['--hostId 2'],
}
show_hide = []
Modified:
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Thu
Dec 9 02:41:36 2010
@@ -254,7 +254,7 @@ class ClusterManagerService(object):
try:
# Prepare the target
self.log.info("migrateVm: Calling prepSourceVm on
source host %s" % sourceHost.name)
- self.proxy[sourceHost.name].prepSourceVm(instance)
+ self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
self.log.info("migrateVm: Calling prepReceiveVm on
target host %s" % targetHost.name)
cookie =
self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
except Exception, e:
@@ -488,3 +488,16 @@ class ClusterManagerService(object):
self.stateTransition(instance,
InstanceState.Activating, InstanceState.Running)
self.data.releaseInstance(instance)
return
+
+ def registerHost(self, hostname, memory, cores, version):
+ hostId, alreadyRegistered = self.data.registerHost(hostname,
memory, cores, version)
+ if alreadyRegistered:
+ self.log.info("Host %s is already registered, it was
updated now" % hostname)
+ else:
+ self.log.info("A host was registered - hostname: %s,
version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+ return hostId
+
+ def unregisterHost(self, hostId):
+ self.data.unregisterHost(hostId)
+ self.log.info("Host %s was unregistered" % hostId)
+ return
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/datainterface.py Thu
Dec 9 02:41:36 2010
@@ -63,3 +63,9 @@ class DataInterface(object):
def getUser(self, id):
raise NotImplementedError
+
+ def registerHost(self, hostname, memory, cores, version):
+ raise NotImplementedError
+
+ def unregisterHost(self, hostId):
+ raise NotImplementedError
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Thu Dec
9 02:41:36 2010
@@ -16,6 +16,8 @@
# under the License.
import threading
+import os
+import ConfigParser
from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data import DataInterface
@@ -34,6 +36,11 @@ class FromConfig(DataInterface):
self.instanceIdLock = threading.Lock()
self.lockNames[self.instanceIdLock] = "instanceIdLock"
self.maxInstanceId = 1
+ self.hostLocks = {}
+ self.hostLock = threading.Lock()
+ self.idLock = threading.Lock()
+ if not self.config.has_section("FromConfig"):
+ return
for (name, value) in self.config.items("FromConfig"):
name = name.lower()
if (name.startswith("host")):
@@ -114,21 +121,28 @@ class FromConfig(DataInterface):
self.releaseLock(self.instanceLock)
def acquireHost(self, hostId):
+ self.hostLock.acquire()
host = self.hosts.get(hostId, None)
if (host is None):
raise
TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" %
(hostId)})
+ # hostLocks dict added when registerHost was implemented,
otherwise newly added hosts don't have _lock
+ self.hostLocks[hostId] = self.hostLocks.get(hostId,
threading.Lock())
+ host._lock = self.hostLocks[host.id]
self.acquireLock(host._lock)
return host
+
def releaseHost(self, host):
try:
if (host.id not in self.hosts): # MPR: should never be
true, but good to check
- raise
TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" %
(hostId)})
+ raise
TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" %
(host.id)})
finally:
+ self.save()
self.releaseLock(host._lock)
+ self.hostLock.release()
def getHosts(self):
- return self.cleanHosts()
+ return self.hosts
def getHost(self, id):
host = self.hosts.get(id, None)
@@ -156,3 +170,81 @@ class FromConfig(DataInterface):
def getUser(self, id):
return self.users[id]
+
+ def registerHost(self, hostname, memory, cores, version):
+ self.hostLock.acquire()
+ for id in self.hosts.keys():
+ if self.hosts[id].name == hostname:
+ host =
Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ self.hosts[id] = host
+ self.save()
+ self.hostLock.release()
+ return id, True
+ id = self.getNewId("hosts")
+ self.hosts[id] =
Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ self.save()
+ self.hostLock.release()
+ return id, False
+
+ def unregisterHost(self, hostId):
+ self.hostLock.acquire()
+ del self.hosts[hostId]
+ self.save()
+ self.hostLock.release()
+
+ def getNewId(self, table):
+ """ Generates id for a new object. For example for hosts and
users.
+ """
+ self.idLock.acquire()
+ maxId = 0
+ l = []
+ if(table == "hosts"):
+ for id in self.hosts.keys():
+ l.append(id)
+ if id >= maxId:
+ maxId = id
+ l.sort() # sort to enable comparing with range output
+ # check if some id is released:
+ t = range(maxId + 1)
+ t.remove(0)
+ if l != t and l != []:
+ releasedIds = filter(lambda x : x not in l, t)
+ self.idLock.release()
+ return releasedIds[0]
+ else:
+ self.idLock.release()
+ return maxId + 1
+
+ def save(self):
+ # XXXstroucki: a relative path? Where does it go
+ # and in what order does it get loaded
+ fileName = "./etc/Tashi.cfg"
+ if not os.path.exists(fileName):
+ file = open(fileName, "w")
+ file.write("[FromConfig]")
+ file.close()
+ parser = ConfigParser.ConfigParser()
+ parser.read(fileName)
+
+ if not parser.has_section("FromConfig"):
+ parser.add_section("FromConfig")
+
+ hostsInFile = []
+ for (name, value) in parser.items("FromConfig"):
+ name = name.lower()
+ if (name.startswith("host")):
+ hostsInFile.append(name)
+
+ for h in hostsInFile:
+ parser.remove_option("FromConfig", h)
+
+ for hId in self.hosts.keys():
+ host = self.hosts[hId]
+ hostPresentation =
"Host(d={'id':%s,'name':'%s','state':HostState.Normal,'memory':%s,'cores':%s,'version':'%s'})"
% (hId, host.name, host.memory, host.cores, host.version)
+ parser.set("FromConfig", "host%s" % hId,
hostPresentation)
+
+ with open(fileName, 'wb') as configfile:
+ parser.write(configfile)
+
+
+
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py Thu
Dec 9 02:41:36 2010
@@ -91,3 +91,10 @@ class GetentOverride(DataInterface):
def getUser(self, id):
self.fetchFromGetent()
return self.users[id]
+
+ def registerHost(self, hostname, memory, cores, version):
+ return self.baseDataObject.registerHost(hostname, memory,
cores, version)
+
+ def unregisterHost(self, hostId):
+ return self.baseDataObject.unregisterHost(hostId)
+
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
(original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py Thu Dec
9 02:41:36 2010
@@ -104,3 +104,10 @@ class LdapOverride(DataInterface):
def getUser(self, id):
self.fetchFromLdap()
return self.users[id]
+
+ def registerHost(self, hostname, memory, cores, version):
+ return self.baseDataObject.registerHost(hostname, memory,
cores, version)
+
+ def unregisterHost(self, hostId):
+ return self.baseDataObject.unregisterHost(hostId)
+
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py Thu Dec 9
02:41:36 2010
@@ -32,6 +32,9 @@ class Pickled(FromConfig):
self.instanceIdLock = threading.Lock()
self.lockNames[self.instanceIdLock] = "instanceIdLock"
self.maxInstanceId = 1
+ self.hostLock = threading.Lock()
+ self.hostLocks = {}
+ self.idLock = threading.Lock()
self.load()
def cleanInstances(self):
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Thu Dec 9
02:41:36 2010
@@ -50,6 +50,7 @@ class SQL(DataInterface):
self.hostLock = threading.Lock()
self.hostLocks = {}
self.maxInstanceId = 1
+ self.idLock = threading.Lock()
self.sqlLock = threading.Lock()
self.verifyStructure()
@@ -258,3 +259,62 @@ class SQL(DataInterface):
r = cur.fetchone()
user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
return user
+
+ def registerHost(self, hostname, memory, cores, version):
+ self.hostLock.acquire()
+ cur = self.executeStatement("SELECT * from hosts")
+ res = cur.fetchall()
+ for r in res:
+ if r[1] == hostname:
+ id = r[0]
+ print "Host %s already registered, update will
be done" % id
+ s = ""
+ host = Host(d={'id': id, 'up': 0, 'decayed': 0,
'state': 1, 'name': hostname, 'memory':memory, 'cores': cores,
'version':version})
+ l = self.makeHostList(host)
+ for e in range(0, len(self.hostOrder)):
+ s = s + self.hostOrder[e] + "=" + l[e]
+ if (e < len(self.hostOrder)-1):
+ s = s + ", "
+ self.executeStatement("UPDATE hosts SET %s
WHERE id = %d" % (s, id))
+ self.hostLock.release()
+ return r[0], True
+ id = self.getNewId("hosts")
+ host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1,
'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
+ l = self.makeHostList(host)
+ self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s,
%s, %s, %s, %s, %s)" % tuple(l))
+ self.hostLock.release()
+ return id, False
+
+ def unregisterHost(self, hostId):
+ self.hostLock.acquire()
+ cur = self.executeStatement("SELECT * from hosts")
+ res = cur.fetchall()
+ for r in res:
+ if r[0] == hostId:
+ self.executeStatement("DELETE FROM hosts WHERE
id = %d" % hostId)
+ self.hostLock.release()
+
+ def getNewId(self, table):
+ """ Generates id for a new object. For example for hosts and
users.
+ """
+ self.idLock.acquire()
+ cur = self.executeStatement("SELECT * from %s" % table)
+ res = cur.fetchall()
+ maxId = 0 # the first id would be 1
+ l = []
+ for r in res:
+ id = r[0]
+ l.append(id)
+ if id >= maxId:
+ maxId = id
+ l.sort() # sort to enable comparing with range output
+ # check if some id is released:
+ t = range(maxId + 1)
+ t.remove(0)
+ if l != t and l != []:
+ releasedIds = filter(lambda x : x not in l, t)
+ self.idLock.release()
+ return releasedIds[0]
+ else:
+ self.idLock.release()
+ return maxId + 1
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL:
http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1043823&r1=1043822&r2=1043823&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Thu Dec
9 02:41:36 2010
@@ -64,6 +64,7 @@ class NodeManagerService(object):
if (vmId not in vmList):
self.log.warning('vmcontrol backend does not
report %d' % (vmId))
self.vmStateChange(vmId, None,
InstanceState.Exited)
+ self.registerHost()
threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
threading.Thread(target=self.registerWithClusterManager).start()
threading.Thread(target=self.statsThread).start()
@@ -202,7 +203,8 @@ class NodeManagerService(object):
transportCookie = self.vmm.prepReceiveVm(instance, source.name)
return transportCookie
- def prepSourceVm(self, instance):
+ def prepSourceVm(self, vmId):
+ instance = self.getInstance(vmId)
instance.state = InstanceState.MigratePrep
def migrateVmHelper(self, instance, target, transportCookie):
@@ -291,3 +293,13 @@ class NodeManagerService(object):
except:
self.log.exception('statsThread threw an
exception')
time.sleep(self.statsInterval)
+
+ def registerHost(self):
+ cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
+ hostname = socket.gethostname()
+ # populate some defaults
+ # XXXstroucki: I think it's better if the nodemanager fills
these in properly when registering with the clustermanager
+ memory = 0
+ cores = 0
+ version = "empty"
+ cm.registerHost(hostname, memory, cores, version)