Index: src/tashi/nodemanager/vmcontrol/xenpv.py
===================================================================
--- src/tashi/nodemanager/vmcontrol/xenpv.py (revision 792304)
+++ src/tashi/nodemanager/vmcontrol/xenpv.py (working copy)
@@ -25,8 +25,8 @@
import socket
from vmcontrolinterface import VmControlInterface
-from tashi.services.ttypes import Errors, InstanceState, TashiException
-from tashi.services.ttypes import Instance, Host
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
+from tashi.rpycservices.rpyctypes import Instance, Host
from tashi import boolean, convertExceptions, ConnectionManager, version
from tashi.util import isolatedRPC, broken
Index: src/tashi/nodemanager/vmcontrol/qemu.py
===================================================================
--- src/tashi/nodemanager/vmcontrol/qemu.py (revision 792304)
+++ src/tashi/nodemanager/vmcontrol/qemu.py (working copy)
@@ -27,7 +27,7 @@
import sys
import time
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.util import broken, logged, scrubString, boolean
from tashi import version, stringPartition
from vmcontrolinterface import VmControlInterface
Index: src/tashi/nodemanager/nodemanager.py
===================================================================
--- src/tashi/nodemanager/nodemanager.py (revision 792304)
+++ src/tashi/nodemanager/nodemanager.py (working copy)
@@ -20,14 +20,16 @@
import logging.config
import signal
import sys
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
from tashi.util import instantiateImplementation, getConfig, debugConsole,
signalHandler
-from tashi.services import nodemanagerservice, clustermanagerservice
from tashi import ConnectionManager
import tashi
+from tashi import boolean
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
+
@signalHandler(signal.SIGTERM)
def handleSIGTERM(signalNumber, stackFrame):
sys.exit(0)
@@ -45,13 +47,22 @@
vmm = instantiateImplementation(config.get("NodeManager", "vmm"),
config, dfs, None)
service = instantiateImplementation(config.get("NodeManager",
"service"), config, vmm)
vmm.nm = service
- processor = nodemanagerservice.Processor(service)
- transport = TServerSocket(int(config.get('NodeManagerService', 'port')))
- server = TThreadedServer(processor, transport)
+
+ if boolean(config.get("Security", "authAndEncrypt")):
+ users = {}
+ users[config.get('AllowedUsers', 'clusterManagerUser')] =
config.get('AllowedUsers', 'clusterManagerPassword')
+ authenticator = VdbAuthenticator.from_dict(users)
+ t = ThreadedServer(service=rpycservices.ManagerService,
hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')),
auto_register=False, authenticator=authenticator)
+ else:
+ t = ThreadedServer(service=rpycservices.ManagerService,
hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')),
auto_register=False)
+ t.logger.quiet = True
+ t.service.service = service
+ t.service._type = 'NodeManagerService'
+
debugConsole(globals())
try:
- server.serve()
+ t.start()
except KeyboardInterrupt:
handleSIGTERM(signal.SIGTERM, None)
except Exception, e:
Index: src/tashi/nodemanager/nodemanagerservice.py
===================================================================
--- src/tashi/nodemanager/nodemanagerservice.py (revision 792304)
+++ src/tashi/nodemanager/nodemanagerservice.py (working copy)
@@ -22,16 +22,13 @@
import sys
import threading
import time
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from tashi.services.ttypes import Host, HostState, InstanceState,
TashiException, Errors, Instance
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState,
TashiException, Errors, Instance
from tashi.nodemanager import RPC
from tashi import boolean, vmStates, logged, ConnectionManager, timed
import tashi
+
class NodeManagerService(object):
"""RPC handler for the NodeManager
@@ -43,6 +40,13 @@
self.vmm = vmm
self.cmHost = config.get("NodeManagerService",
"clusterManagerHost")
self.cmPort = int(config.get("NodeManagerService",
"clusterManagerPort"))
+ self.authAndEncrypt = boolean(config.get('Security',
'authAndEncrypt'))
+ if self.authAndEncrypt:
+ self.username = config.get('AccessClusterManager',
'username')
+ self.password = config.get('AccessClusterManager',
'password')
+ else:
+ self.username = None
+ self.password = None
self.log = logging.getLogger(__file__)
self.convertExceptions =
boolean(config.get('NodeManagerService', 'convertExceptions'))
self.registerFrequency = float(config.get('NodeManagerService',
'registerFrequency'))
@@ -84,7 +88,7 @@
self.log.exception('Failed to save VM info to %s' %
(self.infoFile))
def vmStateChange(self, vmId, old, cur):
- cm = ConnectionManager(clustermanagerservice.Client,
self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
instance = self.getInstance(vmId)
if (old and instance.state != old):
self.log.warning('VM state was %s, call indicated %s' %
(vmStates[instance.state], vmStates[old]))
@@ -103,7 +107,7 @@
return True
def backupVmInfoAndFlushNotifyCM(self):
- cm = ConnectionManager(clustermanagerservice.Client,
self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
while True:
start = time.time()
try:
@@ -135,7 +139,7 @@
time.sleep(toSleep)
def registerWithClusterManager(self):
- cm = ConnectionManager(clustermanagerservice.Client,
self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
while True:
start = time.time()
try:
@@ -154,7 +158,6 @@
raise
TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this
host" % (vmId)})
return instance
- @RPC
def instantiateVm(self, instance):
vmId = self.vmm.instantiateVm(instance)
instance.vmId = vmId
@@ -162,7 +165,6 @@
self.instances[vmId] = instance
return vmId
- @RPC
def suspendVm(self, vmId, destination):
instance = self.getInstance(vmId)
instance.state = InstanceState.Suspending
@@ -173,7 +175,7 @@
instance.state = InstanceState.Running
newInstance =
Instance(d={'id':instance.id,'state':instance.state})
success = lambda: None
- cm = ConnectionManager(clustermanagerservice.Client,
self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
try:
cm.vmUpdate(newInstance.id, newInstance,
InstanceState.Resuming)
except Exception, e:
@@ -182,7 +184,6 @@
else:
success()
- @RPC
def resumeVm(self, instance, name):
instance.state = InstanceState.Resuming
instance.hostId = self.id
@@ -195,7 +196,6 @@
raise
TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the
node manager"})
return instance.vmId
- @RPC
def prepReceiveVm(self, instance, source):
instance.state = InstanceState.MigratePrep
instance.vmId = -1
@@ -206,7 +206,6 @@
self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
del self.instances[instance.vmId]
- @RPC
def migrateVm(self, vmId, target, transportCookie):
instance = self.getInstance(vmId)
instance.state = InstanceState.MigrateTrans
@@ -214,7 +213,7 @@
return
def receiveVmHelper(self, instance, transportCookie):
- cm = ConnectionManager(clustermanagerservice.Client,
self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password,
self.cmPort)[self.cmHost]
vmId = self.vmm.receiveVm(transportCookie)
instance.state = InstanceState.Running
instance.hostId = self.id
@@ -230,48 +229,40 @@
else:
success()
- @RPC
def receiveVm(self, instance, transportCookie):
instance.state = InstanceState.MigrateTrans
threading.Thread(target=self.receiveVmHelper, args=(instance,
transportCookie)).start()
return
- @RPC
def pauseVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.Pausing
self.vmm.pauseVm(vmId)
instance.state = InstanceState.Paused
- @RPC
def unpauseVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.Unpausing
self.vmm.unpauseVm(vmId)
instance.state = InstanceState.Running
- @RPC
def shutdownVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.ShuttingDown
self.vmm.shutdownVm(vmId)
- @RPC
def destroyVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.Destroying
self.vmm.destroyVm(vmId)
- @RPC
def getVmInfo(self, vmId):
instance = self.getInstance(vmId)
return instance
- @RPC
def vmmSpecificCall(self, vmId, arg):
return self.vmm.vmmSpecificCall(vmId, arg)
- @RPC
def listVms(self):
return self.instances.keys()
Index: src/tashi/agents/primitive.py
===================================================================
--- src/tashi/agents/primitive.py (revision 792304)
+++ src/tashi/agents/primitive.py (working copy)
@@ -25,15 +25,14 @@
import time
import logging.config
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.util import getConfig, createClient, instantiateImplementation,
boolean
import tashi
class Primitive(object):
- def __init__(self, config, client, transport):
+ def __init__(self, config, client):
self.config = config
self.client = client
- self.transport = transport
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive",
"scheduleDelay"))
@@ -45,7 +44,7 @@
name = name.lower()
if (name.startswith("hook")):
try:
-
self.hooks.append(instantiateImplementation(value, config, client, transport,
False))
+
self.hooks.append(instantiateImplementation(value, config, client, False))
except:
self.log.exception("Failed to load hook
%s" % (value))
@@ -53,9 +52,6 @@
oldInstances = {}
while True:
try:
- # Make sure transport is open
- if (not self.transport.isOpen()):
- self.transport.open()
# Generate a list of VMs/host
hosts = {}
load = {}
@@ -120,26 +116,18 @@
time.sleep(self.scheduleDelay)
except TashiException, e:
self.log.exception("Tashi exception")
- try:
- self.transport.close()
- except Exception, e:
- self.log.exception("Failed to close the
transport")
time.sleep(self.scheduleDelay)
except Exception, e:
self.log.exception("General exception")
- try:
- self.transport.close()
- except Exception, e:
- self.log.exception("Failed to close the
transport")
time.sleep(self.scheduleDelay)
def main():
(config, configFiles) = getConfig(["Agent"])
publisher = instantiateImplementation(config.get("Agent", "publisher"),
config)
tashi.publisher = publisher
- (client, transport) = createClient(config)
+ client = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, client, transport)
+ agent = Primitive(config, client)
agent.start()
if __name__ == "__main__":
Index: src/tashi/agents/dhcpdns.py
===================================================================
--- src/tashi/agents/dhcpdns.py (revision 792551)
+++ src/tashi/agents/dhcpdns.py (working copy)
@@ -23,7 +23,7 @@
from tashi import boolean
class DhcpDns(InstanceHook):
- def __init__(self, config, client, transport, post=False):
+ def __init__(self, config, client, post=False):
InstanceHook.__init__(self, config, client, post)
self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
Index: src/tashi/agents/instancehook.py
===================================================================
--- src/tashi/agents/instancehook.py (revision 792304)
+++ src/tashi/agents/instancehook.py (working copy)
@@ -18,12 +18,11 @@
# under the License.
class InstanceHook(object):
- def __init__(self, config, client, transport, post=False):
+ def __init__(self, config, client, post=False):
if (self.__class__ is InstanceHook):
raise NotImplementedError
self.config = config
self.client = client
- self.transport = transport
self.post = post
def preCreate(self, instance):
Index: src/tashi/connectionmanager.py
===================================================================
--- src/tashi/connectionmanager.py (revision 792304)
+++ src/tashi/connectionmanager.py (working copy)
@@ -15,49 +15,21 @@
# specific language governing permissions and limitations
# under the License.
-from thrift.transport.TSocket import TSocket, socket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import *
class ConnectionManager(object):
- def __init__(self, clientClass, port, timeout=10000.0):
- self.clientClass = clientClass
+ def __init__(self, username, password, port, timeout=10000.0):
+ self.username = username
+ self.password = password
self.timeout = timeout
self.port = port
- class anonClass(object):
- def __init__(self, clientObject):
- self.co = clientObject
-
- def __getattr__(self, name):
- if (name.startswith("_")):
- return self.__dict__[name]
- def connectWrap(*args, **kw):
- if (not self.co._iprot.trans.isOpen()):
- self.co._iprot.trans.open()
- try:
- res = getattr(self.co, name)(*args,
**kw)
- except socket.error, e:
- # Force a close for the case of a
"Broken pipe"
-# print "Forced a socket close"
- self.co._iprot.trans.close()
- self.co._iprot.trans.open()
- res = getattr(self.co, name)(*args,
**kw)
- self.co._iprot.trans.close()
- raise
- self.co._iprot.trans.close()
- return res
- return connectWrap
-
def __getitem__(self, hostname):
port = self.port
if len(hostname) == 2:
port = hostname[1]
hostname = hostname[0]
- socket = TSocket(hostname, port)
- socket.setTimeout(self.timeout)
- transport = TBufferedTransport(socket)
- protocol = TBinaryProtocol(transport)
- client = self.clientClass(protocol)
- client.__transport__ = transport
- return self.anonClass(client)
+
+ return rpycservices.client(hostname, port,
username=self.username, password=self.password)
Index: src/tashi/rpycservices/__init__.py
===================================================================
--- src/tashi/rpycservices/__init__.py (revision 0)
+++ src/tashi/rpycservices/__init__.py (revision 0)
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import rpyc
Index: src/tashi/rpycservices/rpycservices.py
===================================================================
--- src/tashi/rpycservices/rpycservices.py (revision 0)
+++ src/tashi/rpycservices/rpycservices.py (revision 0)
@@ -0,0 +1,100 @@
+import rpyc
+from tashi.rpycservices.rpyctypes import *
+import cPickle
+
+clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm',
'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks',
'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager',
'vmUpdate', 'activateVm']
+nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm',
'resumeVm', 'prepReceiveVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm',
'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo']
+
+def clean(args):
+ """Cleans the object so cPickle can be used."""
+ if isinstance(args, list) or isinstance(args, tuple):
+ cleanArgs = []
+ for arg in args:
+ cleanArgs.append(clean(arg))
+ if isinstance(args, tuple):
+ cleanArgs = tuple(cleanArgs)
+ return cleanArgs
+ if isinstance(args, Instance):
+ return Instance(args.__dict__)
+ if isinstance(args, Host):
+ return Host(args.__dict__)
+ if isinstance(args, User):
+ user = User(args.__dict__)
+ user.passwd = None
+ return user
+ return args
+
+class client():
+ def __init__(self, host, port, username=None, password=None):
+ """Client for ManagerService. If username and password are
provided, rpyc.tls_connect will be used to connect, else rpyc.connect will be
used."""
+ self.host = host
+ self.port = int(port)
+ self.username = username
+ self.password = password
+ self.conn = self.createConn()
+
+ def createConn(self):
+ """Creates a rpyc connection."""
+ if self.username != None and self.password != None:
+ return rpyc.tls_connect(host=self.host, port=self.port,
username=self.username, password=self.password)
+ else:
+ return rpyc.connect(host=self.host, port=self.port)
+
+ def __getattr__(self, name):
+ """Returns a function that makes the RPC call. No keyword
arguments allowed when calling this function."""
+ if self.conn.closed == True:
+ self.conn = self.createConn()
+ if name not in clusterManagerRPCs and name not in
nodeManagerRPCs:
+ return None
+ def connectWrap(*args):
+ args = cPickle.dumps(clean(args))
+ try:
+ res = getattr(self.conn.root, name)(args)
+ except Exception, e:
+ self.conn.close()
+ raise e
+ res = cPickle.loads(res)
+ if isinstance(res, Exception):
+ raise res
+ return res
+ return connectWrap
+
+class ManagerService(rpyc.Service):
+ """Wrapper for rpyc service"""
+ # Note: self.service and self._type are set before
rpyc.utils.server.ThreadedServer is started.
+ def checkValidUser(self, functionName, clientUsername, args):
+ """Checks whether the operation requested by the user is valid
based on clientUsername. An exception will be thrown if not valid."""
+ if self._type == 'NodeManagerService':
+ return
+ if clientUsername in ['nodeManager', 'agent', 'root']:
+ return
+ if functionName in ['destroyVm', 'shutdownVm', 'pauseVm',
'vmmSpecificCall', 'suspendVm', 'unpauseVm', 'migrateVm', 'resumeVm']:
+ instanceId = args[0]
+ instance = self.service.data.getInstance(instanceId)
+ instanceUsername =
self.service.data.getUser(instance.userId).name
+ if clientUsername != instanceUsername:
+ raise Exception('Permission Denied: %s cannot
perform %s on VM owned by %s' % (clientUsername, functionName,
instanceUsername))
+ return
+
+ def _rpyc_getattr(self, name):
+ """Returns the RPC corresponding to the function call"""
+ def makeCall(args):
+ args = cPickle.loads(args)
+ if self._conn._config['credentials'] != None:
+ try:
+ self.checkValidUser(makeCall._name,
self._conn._config['credentials'], args)
+ except Exception, e:
+ e = cPickle.dumps(clean(e))
+ return e
+ try:
+ res = getattr(self.service,
makeCall._name)(*args)
+ except Exception, e:
+ res = e
+ res = cPickle.dumps(clean(res))
+ return res
+ makeCall._name = name
+ if self._type == 'ClusterManagerService' and name in
clusterManagerRPCs:
+ return makeCall
+ if self._type == 'NodeManagerService' and name in
nodeManagerRPCs:
+ return makeCall
+ raise AttributeError('RPC does not exist')
Index: src/tashi/rpycservices/rpyctypes.py
===================================================================
--- src/tashi/rpycservices/rpyctypes.py (revision 0)
+++ src/tashi/rpycservices/rpyctypes.py (revision 0)
@@ -0,0 +1,247 @@
+class Errors(object):
+ ConvertedException = 1
+ NoSuchInstanceId = 2
+ NoSuchVmId = 3
+ IncorrectVmState = 4
+ NoSuchHost = 5
+ NoSuchHostId = 6
+ InstanceIdAlreadyExists = 7
+ HostNameMismatch = 8
+ HostNotUp = 9
+ HostStateError = 10
+ InvalidInstance = 11
+ UnableToResume = 12
+ UnableToSuspend = 13
+
+class InstanceState(object):
+ Pending = 1
+ Activating = 2
+ Running = 3
+ Pausing = 4
+ Paused = 5
+ Unpausing = 6
+ Suspending = 7
+ Resuming = 8
+ MigratePrep = 9
+ MigrateTrans = 10
+ ShuttingDown = 11
+ Destroying = 12
+ Orphaned = 13
+ Held = 14
+ Exited = 15
+ Suspended = 16
+
+class HostState(object):
+ Normal = 1
+ Drained = 2
+ VersionMismatch = 3
+
+class TashiException(Exception):
+ def __init__(self, d=None):
+ self.errno = None
+ self.msg = None
+ if isinstance(d, dict):
+ if 'errno' in d:
+ self.errno = d['errno']
+ if 'msg' in d:
+ self.msg = d['msg']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Host(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.name = None
+ self.up = None
+ self.decayed = None
+ self.state = None
+ self.memory = None
+ self.cores = None
+ self.version = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'name' in d:
+ self.name = d['name']
+ if 'up' in d:
+ self.up = d['up']
+ if 'decayed' in d:
+ self.decayed = d['decayed']
+ if 'state' in d:
+ self.state = d['state']
+ if 'memory' in d:
+ self.memory = d['memory']
+ if 'cores' in d:
+ self.cores = d['cores']
+ if 'version' in d:
+ self.version = d['version']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Network(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.name = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'name' in d:
+ self.name = d['name']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class User(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.name = None
+ self.passwd = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'name' in d:
+ self.name = d['name']
+ if 'passwd' in d:
+ self.passwd = d['passwd']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class DiskConfiguration(object):
+ def __init__(self, d=None):
+ self.uri = None
+ self.persistent = None
+ if isinstance(d, dict):
+ if 'uri' in d:
+ self.uri = d['uri']
+ if 'persistent' in d:
+ self.persistent = d['persistent']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class NetworkConfiguration(object):
+ def __init__(self, d=None):
+ self.network = None
+ self.mac = None
+ self.ip = None
+ if isinstance(d, dict):
+ if 'network' in d:
+ self.network = d['network']
+ if 'mac' in d:
+ self.mac = d['mac']
+ if 'ip' in d:
+ self.ip = d['ip']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Instance(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.vmId = None
+ self.hostId = None
+ self.decayed = None
+ self.state = None
+ self.userId = None
+ self.name = None
+ self.cores = None
+ self.memory = None
+ self.disks = None
+ #Quick fix so self.nics is not None
+ self.nics = []
+ self.hints = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'vmId' in d:
+ self.vmId = d['vmId']
+ if 'hostId' in d:
+ self.hostId = d['hostId']
+ if 'decayed' in d:
+ self.decayed = d['decayed']
+ if 'state' in d:
+ self.state = d['state']
+ if 'userId' in d:
+ self.userId = d['userId']
+ if 'name' in d:
+ self.name = d['name']
+ if 'cores' in d:
+ self.cores = d['cores']
+ if 'memory' in d:
+ self.memory = d['memory']
+ if 'disks' in d:
+ self.disks = d['disks']
+ if 'nics' in d:
+ self.nics = d['nics']
+ if 'hints' in d:
+ self.hints = d['hints']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ ==
other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
Index: src/tashi/util.py
===================================================================
--- src/tashi/util.py (revision 792304)
+++ src/tashi/util.py (working copy)
@@ -26,13 +26,11 @@
import time
import traceback
import types
+import getpass
-from thrift.transport.TSocket import TServerSocket, TSocket
-from thrift.server.TServer import TThreadedServer
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from tashi.services import clustermanagerservice
-from tashi.services.ttypes import TashiException, Errors, InstanceState,
HostState
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import TashiException, Errors,
InstanceState, HostState
def broken(oldFunc):
"""Decorator that is used to mark a function as temporarily broken"""
@@ -269,15 +267,20 @@
host = os.getenv('TASHI_CM_HOST', cfgHost)
port = os.getenv('TASHI_CM_PORT', cfgPort)
timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
- socket = TSocket(host, int(port))
- socket.setTimeout(timeout)
- transport = TBufferedTransport(socket)
- protocol = TBinaryProtocol(transport)
- client = clustermanagerservice.Client(protocol)
- transport.open()
- client._transport = transport
- return (client, transport)
+ authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+ if authAndEncrypt:
+ username = config.get('AccessClusterManager', 'username')
+ if username == '':
+ username = raw_input('Enter Username:')
+ password = config.get('AccessClusterManager', 'password')
+ if password == '':
+ password = getpass.getpass('Enter Password:')
+ client = rpycservices.client(host, port, username=username,
password=password)
+ else:
+ client = rpycservices.client(host, port)
+ return client
+
def enumToStringDict(cls):
d = {}
for i in cls.__dict__:
Index: src/tashi/clustermanager/clustermanager.py
===================================================================
--- src/tashi/clustermanager/clustermanager.py (revision 792304)
+++ src/tashi/clustermanager/clustermanager.py (working copy)
@@ -24,27 +24,41 @@
import logging.config
from getopt import getopt, GetoptError
from ConfigParser import ConfigParser
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
-from tashi.services import clustermanagerservice
from tashi.util import signalHandler, boolean, instantiateImplementation,
getConfig, debugConsole
import tashi
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
+
def startClusterManager(config):
global service, data
dfs = instantiateImplementation(config.get("ClusterManager", "dfs"),
config)
data = instantiateImplementation(config.get("ClusterManager", "data"),
config)
service = instantiateImplementation(config.get("ClusterManager",
"service"), config, data, dfs)
- processor = clustermanagerservice.Processor(service)
- transport = TServerSocket(int(config.get('ClusterManagerService',
'port')))
- server = TThreadedServer(processor, transport)
-
+
+ if boolean(config.get("Security", "authAndEncrypt")):
+ users = {}
+ userDatabase = data.getUsers()
+ for user in userDatabase.values():
+ if user.passwd != None:
+ users[user.name] = user.passwd
+ users[config.get('AllowedUsers', 'nodeManagerUser')] =
config.get('AllowedUsers', 'nodeManagerPassword')
+ users[config.get('AllowedUsers', 'agentUser')] =
config.get('AllowedUsers', 'agentPassword')
+ authenticator = VdbAuthenticator.from_dict(users)
+ t = ThreadedServer(service=rpycservices.ManagerService,
hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')),
auto_register=False, authenticator=authenticator)
+ else:
+ t = ThreadedServer(service=rpycservices.ManagerService,
hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')),
auto_register=False)
+ t.logger.quiet = True
+ t.service.service = service
+ t.service._type = 'ClusterManagerService'
+
debugConsole(globals())
try:
- server.serve()
+ t.start()
except KeyboardInterrupt:
handleSIGTERM(signal.SIGTERM, None)
Index: src/tashi/clustermanager/clustermanagerservice.py
===================================================================
--- src/tashi/clustermanager/clustermanagerservice.py (revision 792304)
+++ src/tashi/clustermanager/clustermanagerservice.py (working copy)
@@ -18,28 +18,28 @@
from datetime import datetime
from random import randint
from socket import gethostname
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
import logging
import threading
import time
-from tashi.services.ttypes import Errors, InstanceState, HostState,
TashiException
-from tashi.services import nodemanagerservice
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState,
TashiException
from tashi import boolean, convertExceptions, ConnectionManager, vmStates,
timed, version, scrubString
-def RPC(oldFunc):
- return convertExceptions(oldFunc)
-
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
def __init__(self, config, data, dfs):
self.config = config
self.data = data
+ self.authAndEncrypt = boolean(config.get('Security',
'authAndEncrypt'))
+ if self.authAndEncrypt:
+ self.username = config.get('AccessNodeManager',
'username')
+ self.password = config.get('AccessNodeManager',
'password')
+ else:
+ self.username = None
+ self.password = None
+ self.proxy = ConnectionManager(self.username, self.password,
int(self.config.get('ClusterManager', 'nodeManagerPort')))
self.dfs = dfs
- self.proxy = ConnectionManager(nodemanagerservice.Client,
int(self.config.get('ClusterManager', 'nodeManagerPort')))
self.convertExceptions =
boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
self.lastContacted = {}
@@ -181,7 +181,6 @@
del instance.hints[hint]
return instance
- @RPC
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
instance = self.normalize(instance)
@@ -189,7 +188,6 @@
self.data.releaseInstance(instance)
return instance
- @RPC
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Running,
InstanceState.ShuttingDown)
@@ -202,7 +200,6 @@
raise
return
- @RPC
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state
is InstanceState.Held):
@@ -221,7 +218,6 @@
raise
return
- @RPC
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Running,
InstanceState.Suspending)
@@ -235,7 +231,6 @@
raise TashiException(d={'errno':Errors.UnableToSuspend,
'msg':'Failed to suspend %s' % (instance.name)})
return
- @RPC
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Suspended,
InstanceState.Pending)
@@ -244,7 +239,6 @@
self.data.releaseInstance(instance)
return instance
- @RPC
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
try:
@@ -285,7 +279,6 @@
raise
return
- @RPC
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Running,
InstanceState.Pausing)
@@ -301,7 +294,6 @@
self.data.releaseInstance(instance)
return
- @RPC
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Paused,
InstanceState.Unpausing)
@@ -317,23 +309,18 @@
self.data.releaseInstance(instance)
return
- @RPC
def getHosts(self):
return self.data.getHosts().values()
- @RPC
def getNetworks(self):
return self.data.getNetworks().values()
- @RPC
def getUsers(self):
return self.data.getUsers().values()
- @RPC
def getInstances(self):
return self.data.getInstances().values()
- @RPC
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
@@ -345,7 +332,6 @@
return res
# @timed
- @RPC
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state
polling -- state changes here are NOT AUTHORITATIVE"""
if (host.id == None):
@@ -408,7 +394,6 @@
self.data.releaseHost(oldHost)
return host.id
- @RPC
def vmUpdate(self, instanceId, instance, oldState):
try:
oldInstance = self.data.acquireInstance(instanceId)
@@ -450,7 +435,6 @@
self.data.releaseInstance(oldInstance)
return
- @RPC
def activateVm(self, instanceId, host):
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
Index: src/tashi/clustermanager/data/fromconfig.py
===================================================================
--- src/tashi/clustermanager/data/fromconfig.py (revision 792304)
+++ src/tashi/clustermanager/data/fromconfig.py (working copy)
@@ -17,7 +17,7 @@
import threading
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
@@ -128,7 +128,7 @@
self.releaseLock(host._lock)
def getHosts(self):
- return self.hosts
+ return self.cleanHosts()
def getHost(self, id):
host = self.hosts.get(id, None)
Index: src/tashi/clustermanager/data/pickled.py
===================================================================
--- src/tashi/clustermanager/data/pickled.py (revision 792304)
+++ src/tashi/clustermanager/data/pickled.py (working copy)
@@ -18,7 +18,7 @@
import cPickle
import os
import threading
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data import FromConfig, DataInterface
class Pickled(FromConfig):
Index: src/tashi/clustermanager/data/sql.py
===================================================================
--- src/tashi/clustermanager/data/sql.py (revision 792304)
+++ src/tashi/clustermanager/data/sql.py (working copy)
@@ -19,7 +19,7 @@
import threading
import time
import types
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data.datainterface import DataInterface
from tashi.util import stringPartition, boolean
@@ -74,7 +74,7 @@
self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id
int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL,
state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT
NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024)
NOT NULL, hints varchar(1024) NOT NULL)")
self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id
INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0,
decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores
int(11), version varchar(256))")
self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id
int(11) NOT NULL, name varchar(256) NOT NULL)")
- self.executeStatement("CREATE TABLE IF NOT EXISTS users (id
int(11) NOT NULL, name varchar(256) NOT NULL)")
+ self.executeStatement("CREATE TABLE IF NOT EXISTS users (id
int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
def sanitizeForSql(self, s):
if (s == '"True"'):
@@ -247,12 +247,12 @@
res = cur.fetchall()
users = {}
for r in res:
- user = User(d={'id':r[0], 'name':r[1]})
+ user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
users[user.id] = user
return users
def getUser(self, id):
cur = self.executeStatement("SELECT * FROM users WHERE id = %d"
% (id))
r = cur.fetchone()
- user = User(d={'id':r[0], 'name':r[1]})
+ user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
return user
Index: src/tashi/clustermanager/data/ldapoverride.py
===================================================================
--- src/tashi/clustermanager/data/ldapoverride.py (revision 792304)
+++ src/tashi/clustermanager/data/ldapoverride.py (working copy)
@@ -17,7 +17,7 @@
import subprocess
import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation
Index: src/tashi/clustermanager/data/getentoverride.py
===================================================================
--- src/tashi/clustermanager/data/getentoverride.py (revision 792304)
+++ src/tashi/clustermanager/data/getentoverride.py (working copy)
@@ -17,7 +17,7 @@
import subprocess
import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation
Index: src/tashi/client/tashi-client.py
===================================================================
--- src/tashi/client/tashi-client.py (revision 792304)
+++ src/tashi/client/tashi-client.py (working copy)
@@ -21,12 +21,7 @@
import random
import sys
import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import *
from tashi import vmStates, hostStates, boolean, getConfig, stringPartition,
createClient
users = {}
@@ -46,7 +41,10 @@
def getUser():
fetchUsers()
- userStr = os.getenv("USER", "unknown")
+ if client.username != None:
+ userStr = client.username
+ else:
+ userStr = os.getenv("USER", "unknown")
for user in users:
if (users[user].name == userStr):
return users[user].id
@@ -66,7 +64,7 @@
raise ValueError("Unknown instance %s" % (str(instance)))
for instance in instances:
if (instance.id == instanceId):
- if (instance.userId != userId and instance.userId !=
None):
+ if (instance.userId != userId and instance.userId !=
None and userId != 0):
raise ValueError("You don't own that VM")
return instanceId
@@ -461,7 +459,7 @@
usage(function)
try:
vals = {}
- (client, transport) = createClient(config)
+ client = createClient(config)
for parg in possibleArgs:
(parg, conv, default, required) = parg
val = None
@@ -498,8 +496,6 @@
print "TashiException:"
print e.msg
exitCode = e.errno
- except TTransportException, e:
- print e
except Exception, e:
print e
usage(function)
Index: etc/TashiDefaults.cfg
===================================================================
--- etc/TashiDefaults.cfg (revision 792304)
+++ etc/TashiDefaults.cfg (working copy)
@@ -15,6 +15,27 @@
# specific language governing permissions and limitations
# under the License.
+[Security]
+authAndEncrypt = False
+
+[AccessClusterManager]
+#If username and password are left empty, user will be prompted for username
and password on the command line.
+username = nodemanager
+password = nodemanager
+
+[AccessNodeManager]
+#If username and password are left empty, user will be prompted for username
and password on the command line.
+username = clustermanager
+password = clustermanager
+
+[AllowedUsers]
+nodeManagerUser = nodemanager
+nodeManagerPassword = nodemanager
+agentUser = agent
+agentPassword = agent
+clusterManagerUser = clustermanager
+clusterManagerPassword = clustermanager
+
# ClusterManager portion
[ClusterManager]
service = tashi.clustermanager.ClusterManagerService
Here is a patch that replaces thrift with rpyc, which supports
authentication and encryption through a username and password combination.
