Here is a patch that replaces thrift with rpyc, which supports authentication and encryption through a username and password combination.
Index: src/tashi/nodemanager/vmcontrol/xenpv.py
===================================================================
--- src/tashi/nodemanager/vmcontrol/xenpv.py    (revision 792304)
+++ src/tashi/nodemanager/vmcontrol/xenpv.py    (working copy)
@@ -25,8 +25,8 @@
 import socket
 
 from vmcontrolinterface import VmControlInterface
-from tashi.services.ttypes import Errors, InstanceState, TashiException
-from tashi.services.ttypes import Instance, Host
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
+from tashi.rpycservices.rpyctypes import Instance, Host
 from tashi import boolean, convertExceptions, ConnectionManager, version
 from tashi.util import isolatedRPC, broken
 
Index: src/tashi/nodemanager/vmcontrol/qemu.py
===================================================================
--- src/tashi/nodemanager/vmcontrol/qemu.py     (revision 792304)
+++ src/tashi/nodemanager/vmcontrol/qemu.py     (working copy)
@@ -27,7 +27,7 @@
 import sys
 import time
 
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.util import broken, logged, scrubString, boolean
 from tashi import version, stringPartition
 from vmcontrolinterface import VmControlInterface
Index: src/tashi/nodemanager/nodemanager.py
===================================================================
--- src/tashi/nodemanager/nodemanager.py        (revision 792304)
+++ src/tashi/nodemanager/nodemanager.py        (working copy)
@@ -20,14 +20,16 @@
 import logging.config
 import signal
 import sys
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
 
 from tashi.util import instantiateImplementation, getConfig, debugConsole, 
signalHandler
-from tashi.services import nodemanagerservice, clustermanagerservice
 from tashi import ConnectionManager
 import tashi
+from tashi import boolean
 
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
+
 @signalHandler(signal.SIGTERM)
 def handleSIGTERM(signalNumber, stackFrame):
        sys.exit(0)
@@ -45,13 +47,22 @@
        vmm = instantiateImplementation(config.get("NodeManager", "vmm"), 
config, dfs, None)
        service = instantiateImplementation(config.get("NodeManager", 
"service"), config, vmm)
        vmm.nm = service
-       processor = nodemanagerservice.Processor(service)
-       transport = TServerSocket(int(config.get('NodeManagerService', 'port')))
-       server = TThreadedServer(processor, transport)
+
+       if boolean(config.get("Security", "authAndEncrypt")):
+               users = {}
+               users[config.get('AllowedUsers', 'clusterManagerUser')] = 
config.get('AllowedUsers', 'clusterManagerPassword')
+               authenticator = VdbAuthenticator.from_dict(users)
+               t = ThreadedServer(service=rpycservices.ManagerService, 
hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), 
auto_register=False, authenticator=authenticator)
+       else:
+               t = ThreadedServer(service=rpycservices.ManagerService, 
hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), 
auto_register=False)
+       t.logger.quiet = True
+       t.service.service = service
+       t.service._type = 'NodeManagerService'
+
        debugConsole(globals())
        
        try:
-               server.serve()
+               t.start()
        except KeyboardInterrupt:
                handleSIGTERM(signal.SIGTERM, None)
        except Exception, e:
Index: src/tashi/nodemanager/nodemanagerservice.py
===================================================================
--- src/tashi/nodemanager/nodemanagerservice.py (revision 792304)
+++ src/tashi/nodemanager/nodemanagerservice.py (working copy)
@@ -22,16 +22,13 @@
 import sys
 import threading
 import time
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
 
-from tashi.services.ttypes import Host, HostState, InstanceState, 
TashiException, Errors, Instance
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, 
TashiException, Errors, Instance
 from tashi.nodemanager import RPC
 from tashi import boolean, vmStates, logged, ConnectionManager, timed
 import tashi
 
+
 class NodeManagerService(object):
        """RPC handler for the NodeManager
           
@@ -43,6 +40,13 @@
                self.vmm = vmm
                self.cmHost = config.get("NodeManagerService", 
"clusterManagerHost")
                self.cmPort = int(config.get("NodeManagerService", 
"clusterManagerPort"))
+               self.authAndEncrypt = boolean(config.get('Security', 
'authAndEncrypt'))
+               if self.authAndEncrypt:
+                       self.username = config.get('AccessClusterManager', 
'username')
+                       self.password = config.get('AccessClusterManager', 
'password')
+               else:
+                       self.username = None
+                       self.password = None
                self.log = logging.getLogger(__file__)
                self.convertExceptions = 
boolean(config.get('NodeManagerService', 'convertExceptions'))
                self.registerFrequency = float(config.get('NodeManagerService', 
'registerFrequency'))
@@ -84,7 +88,7 @@
                        self.log.exception('Failed to save VM info to %s' % 
(self.infoFile))
        
        def vmStateChange(self, vmId, old, cur):
-               cm = ConnectionManager(clustermanagerservice.Client, 
self.cmPort)[self.cmHost]
+               cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
                instance = self.getInstance(vmId)
                if (old and instance.state != old):
                        self.log.warning('VM state was %s, call indicated %s' % 
(vmStates[instance.state], vmStates[old]))
@@ -103,7 +107,7 @@
                return True
        
        def backupVmInfoAndFlushNotifyCM(self):
-               cm = ConnectionManager(clustermanagerservice.Client, 
self.cmPort)[self.cmHost]
+               cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
                while True:
                        start = time.time()
                        try:
@@ -135,7 +139,7 @@
                                time.sleep(toSleep)
        
        def registerWithClusterManager(self):
-               cm = ConnectionManager(clustermanagerservice.Client, 
self.cmPort)[self.cmHost]
+               cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
                while True:
                        start = time.time()
                        try:
@@ -154,7 +158,6 @@
                        raise 
TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this 
host" % (vmId)})
                return instance
        
-       @RPC
        def instantiateVm(self, instance):
                vmId = self.vmm.instantiateVm(instance)
                instance.vmId = vmId
@@ -162,7 +165,6 @@
                self.instances[vmId] = instance
                return vmId
        
-       @RPC
        def suspendVm(self, vmId, destination):
                instance = self.getInstance(vmId)
                instance.state = InstanceState.Suspending
@@ -173,7 +175,7 @@
                instance.state = InstanceState.Running
                newInstance = 
Instance(d={'id':instance.id,'state':instance.state})
                success = lambda: None
-               cm = ConnectionManager(clustermanagerservice.Client, 
self.cmPort)[self.cmHost]
+               cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
                try:
                        cm.vmUpdate(newInstance.id, newInstance, 
InstanceState.Resuming)
                except Exception, e:
@@ -182,7 +184,6 @@
                else:
                        success()
        
-       @RPC
        def resumeVm(self, instance, name):
                instance.state = InstanceState.Resuming
                instance.hostId = self.id
@@ -195,7 +196,6 @@
                        raise 
TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the 
node manager"})
                return instance.vmId
        
-       @RPC
        def prepReceiveVm(self, instance, source):
                instance.state = InstanceState.MigratePrep
                instance.vmId = -1
@@ -206,7 +206,6 @@
                self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
                del self.instances[instance.vmId]
                
-       @RPC
        def migrateVm(self, vmId, target, transportCookie):
                instance = self.getInstance(vmId)
                instance.state = InstanceState.MigrateTrans
@@ -214,7 +213,7 @@
                return
        
        def receiveVmHelper(self, instance, transportCookie):
-               cm = ConnectionManager(clustermanagerservice.Client, 
self.cmPort)[self.cmHost]
+               cm = ConnectionManager(self.username, self.password, 
self.cmPort)[self.cmHost]
                vmId = self.vmm.receiveVm(transportCookie)
                instance.state = InstanceState.Running
                instance.hostId = self.id
@@ -230,48 +229,40 @@
                else:
                        success()
        
-       @RPC
        def receiveVm(self, instance, transportCookie):
                instance.state = InstanceState.MigrateTrans
                threading.Thread(target=self.receiveVmHelper, args=(instance, 
transportCookie)).start()
                return
        
-       @RPC
        def pauseVm(self, vmId):
                instance = self.getInstance(vmId)
                instance.state = InstanceState.Pausing
                self.vmm.pauseVm(vmId)
                instance.state = InstanceState.Paused
        
-       @RPC
        def unpauseVm(self, vmId):
                instance = self.getInstance(vmId)
                instance.state = InstanceState.Unpausing
                self.vmm.unpauseVm(vmId)
                instance.state = InstanceState.Running
        
-       @RPC
        def shutdownVm(self, vmId):
                instance = self.getInstance(vmId)
                instance.state = InstanceState.ShuttingDown
                self.vmm.shutdownVm(vmId)
        
-       @RPC
        def destroyVm(self, vmId):
                instance = self.getInstance(vmId)
                instance.state = InstanceState.Destroying
                self.vmm.destroyVm(vmId)
        
-       @RPC
        def getVmInfo(self, vmId):
                instance = self.getInstance(vmId)
                return instance
        
-       @RPC
        def vmmSpecificCall(self, vmId, arg):
                return self.vmm.vmmSpecificCall(vmId, arg)
        
-       @RPC
        def listVms(self):
                return self.instances.keys()
        
Index: src/tashi/agents/primitive.py
===================================================================
--- src/tashi/agents/primitive.py       (revision 792304)
+++ src/tashi/agents/primitive.py       (working copy)
@@ -25,15 +25,14 @@
 import time
 import logging.config
 
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.util import getConfig, createClient, instantiateImplementation, 
boolean
 import tashi
 
 class Primitive(object):
-       def __init__(self, config, client, transport):
+       def __init__(self, config, client):
                self.config = config
                self.client = client
-               self.transport = transport
                self.hooks = []
                self.log = logging.getLogger(__file__)
                self.scheduleDelay = float(self.config.get("Primitive", 
"scheduleDelay"))
@@ -45,7 +44,7 @@
                        name = name.lower()
                        if (name.startswith("hook")):
                                try:
-                                       
self.hooks.append(instantiateImplementation(value, config, client, transport, 
False))
+                                       
self.hooks.append(instantiateImplementation(value, config, client, False))
                                except:
                                        self.log.exception("Failed to load hook 
%s" % (value))
        
@@ -53,9 +52,6 @@
                oldInstances = {}
                while True:
                        try:
-                               # Make sure transport is open
-                               if (not self.transport.isOpen()):
-                                       self.transport.open()
                                # Generate a list of VMs/host
                                hosts = {}
                                load = {}
@@ -120,26 +116,18 @@
                                time.sleep(self.scheduleDelay)
                        except TashiException, e:
                                self.log.exception("Tashi exception")
-                               try:
-                                       self.transport.close()
-                               except Exception, e:
-                                       self.log.exception("Failed to close the 
transport")
                                time.sleep(self.scheduleDelay)
                        except Exception, e:
                                self.log.exception("General exception")
-                               try:
-                                       self.transport.close()
-                               except Exception, e:
-                                       self.log.exception("Failed to close the 
transport")
                                time.sleep(self.scheduleDelay)
 
 def main():
        (config, configFiles) = getConfig(["Agent"])
        publisher = instantiateImplementation(config.get("Agent", "publisher"), 
config)
        tashi.publisher = publisher
-       (client, transport) = createClient(config)
+       client = createClient(config)
        logging.config.fileConfig(configFiles)
-       agent = Primitive(config, client, transport)
+       agent = Primitive(config, client)
        agent.start()
 
 if __name__ == "__main__":
Index: src/tashi/agents/dhcpdns.py
===================================================================
--- src/tashi/agents/dhcpdns.py (revision 792551)
+++ src/tashi/agents/dhcpdns.py (working copy)
@@ -23,7 +23,7 @@
 from tashi import boolean
 
 class DhcpDns(InstanceHook):
-       def __init__(self, config, client, transport, post=False):
+       def __init__(self, config, client, post=False):
                InstanceHook.__init__(self, config, client, post)
                self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
                self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
Index: src/tashi/agents/instancehook.py
===================================================================
--- src/tashi/agents/instancehook.py    (revision 792304)
+++ src/tashi/agents/instancehook.py    (working copy)
@@ -18,12 +18,11 @@
 # under the License.    
 
 class InstanceHook(object):
-       def __init__(self, config, client, transport, post=False):
+       def __init__(self, config, client, post=False):
                if (self.__class__ is InstanceHook):
                        raise NotImplementedError
                self.config = config
                self.client = client
-               self.transport = transport
                self.post = post
        
        def preCreate(self, instance):
Index: src/tashi/connectionmanager.py
===================================================================
--- src/tashi/connectionmanager.py      (revision 792304)
+++ src/tashi/connectionmanager.py      (working copy)
@@ -15,49 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from thrift.transport.TSocket import TSocket, socket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import *
 
 class ConnectionManager(object):
-       def __init__(self, clientClass, port, timeout=10000.0):
-               self.clientClass = clientClass
+       def __init__(self, username, password, port, timeout=10000.0):
+               self.username = username
+               self.password = password
                self.timeout = timeout
                self.port = port
        
-       class anonClass(object):
-               def __init__(self, clientObject):
-                       self.co = clientObject
-               
-               def __getattr__(self, name):
-                       if (name.startswith("_")):
-                               return self.__dict__[name]
-                       def connectWrap(*args, **kw):
-                               if (not self.co._iprot.trans.isOpen()):
-                                       self.co._iprot.trans.open()
-                               try:
-                                       res = getattr(self.co, name)(*args, 
**kw)
-                               except socket.error, e:
-                                       # Force a close for the case of a 
"Broken pipe"
-#                                      print "Forced a socket close"
-                                       self.co._iprot.trans.close()
-                                       self.co._iprot.trans.open()
-                                       res = getattr(self.co, name)(*args, 
**kw)
-                                       self.co._iprot.trans.close()
-                                       raise
-                               self.co._iprot.trans.close()
-                               return res
-                       return connectWrap
-       
        def __getitem__(self, hostname):
                port = self.port
                if len(hostname) == 2:
                        port = hostname[1]
                        hostname = hostname[0]
-               socket = TSocket(hostname, port)
-               socket.setTimeout(self.timeout)
-               transport = TBufferedTransport(socket)
-               protocol = TBinaryProtocol(transport)
-               client = self.clientClass(protocol)
-               client.__transport__ = transport
-               return self.anonClass(client)
+
+               return rpycservices.client(hostname, port, 
username=self.username, password=self.password)
Index: src/tashi/rpycservices/__init__.py
===================================================================
--- src/tashi/rpycservices/__init__.py  (revision 0)
+++ src/tashi/rpycservices/__init__.py  (revision 0)
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+import rpyc
Index: src/tashi/rpycservices/rpycservices.py
===================================================================
--- src/tashi/rpycservices/rpycservices.py      (revision 0)
+++ src/tashi/rpycservices/rpycservices.py      (revision 0)
@@ -0,0 +1,100 @@
+import rpyc
+from tashi.rpycservices.rpyctypes import *
+import cPickle
+
+clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 
'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 
'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 
'vmUpdate', 'activateVm']
+nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 
'resumeVm', 'prepReceiveVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 
'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo']
+
+def clean(args):
+       """Cleans the object so cPickle can be used."""
+       if isinstance(args, list) or isinstance(args, tuple):
+               cleanArgs = []
+               for arg in args:
+                       cleanArgs.append(clean(arg))
+               if isinstance(args, tuple):
+                       cleanArgs = tuple(cleanArgs)
+               return cleanArgs
+       if isinstance(args, Instance):
+               return Instance(args.__dict__)
+       if isinstance(args, Host):
+               return Host(args.__dict__)
+       if isinstance(args, User):
+               user = User(args.__dict__)
+               user.passwd = None
+               return user
+       return args
+
+class client():
+       def __init__(self, host, port, username=None, password=None):
+               """Client for ManagerService. If username and password are 
provided, rpyc.tls_connect will be used to connect, else rpyc.connect will be 
used."""
+               self.host = host
+               self.port = int(port)
+               self.username = username
+               self.password = password
+               self.conn = self.createConn()
+       
+       def createConn(self):
+               """Creates a rpyc connection."""
+               if self.username != None and self.password != None:
+                       return rpyc.tls_connect(host=self.host, port=self.port, 
username=self.username, password=self.password)
+               else:
+                       return rpyc.connect(host=self.host, port=self.port)
+
+       def __getattr__(self, name):
+               """Returns a function that makes the RPC call. No keyword 
arguments allowed when calling this function."""
+               if self.conn.closed == True:
+                       self.conn = self.createConn()
+               if name not in clusterManagerRPCs and name not in 
nodeManagerRPCs:
+                       return None
+               def connectWrap(*args):
+                       args = cPickle.dumps(clean(args))
+                       try:
+                               res = getattr(self.conn.root, name)(args)
+                       except Exception, e:
+                               self.conn.close()
+                               raise e
+                       res = cPickle.loads(res)
+                       if isinstance(res, Exception):
+                               raise res
+                       return res
+               return connectWrap
+
+class ManagerService(rpyc.Service):
+       """Wrapper for rpyc service"""
+       # Note: self.service and self._type are set before 
rpyc.utils.server.ThreadedServer is started.
+       def checkValidUser(self, functionName, clientUsername, args):
+               """Checks whether the operation requested by the user is valid 
based on clientUsername. An exception will be thrown if not valid."""
+               if self._type == 'NodeManagerService':
+                       return
+               if clientUsername in ['nodeManager', 'agent', 'root']:
+                       return
+               if functionName in ['destroyVm', 'shutdownVm', 'pauseVm', 
'vmmSpecificCall', 'suspendVm', 'unpauseVm', 'migrateVm', 'resumeVm']:
+                       instanceId = args[0]
+                       instance = self.service.data.getInstance(instanceId)
+                       instanceUsername = 
self.service.data.getUser(instance.userId).name
+                       if clientUsername != instanceUsername:
+                               raise Exception('Permission Denied: %s cannot 
perform %s on VM owned by %s' % (clientUsername, functionName, 
instanceUsername))
+               return
+               
+       def _rpyc_getattr(self, name):
+               """Returns the RPC corresponding to the function call"""
+               def makeCall(args):
+                       args = cPickle.loads(args)
+                       if self._conn._config['credentials'] != None:
+                               try:
+                                       self.checkValidUser(makeCall._name, 
self._conn._config['credentials'], args)
+                               except Exception, e:
+                                       e = cPickle.dumps(clean(e))
+                                       return e
+                       try:
+                               res = getattr(self.service, 
makeCall._name)(*args)
+                       except Exception, e:
+                               res = e
+                       res = cPickle.dumps(clean(res))
+                       return res
+               makeCall._name = name
+               if self._type == 'ClusterManagerService' and name in 
clusterManagerRPCs:
+                       return makeCall
+               if self._type == 'NodeManagerService' and name in 
nodeManagerRPCs:
+                       return makeCall
+               raise AttributeError('RPC does not exist')
Index: src/tashi/rpycservices/rpyctypes.py
===================================================================
--- src/tashi/rpycservices/rpyctypes.py (revision 0)
+++ src/tashi/rpycservices/rpyctypes.py (revision 0)
@@ -0,0 +1,247 @@
+class Errors(object):
+       ConvertedException = 1
+       NoSuchInstanceId = 2
+       NoSuchVmId = 3
+       IncorrectVmState = 4
+       NoSuchHost = 5
+       NoSuchHostId = 6
+       InstanceIdAlreadyExists = 7
+       HostNameMismatch = 8
+       HostNotUp = 9
+       HostStateError = 10
+       InvalidInstance = 11
+       UnableToResume = 12
+       UnableToSuspend = 13
+
+class InstanceState(object):
+       Pending = 1
+       Activating = 2
+       Running = 3
+       Pausing = 4
+       Paused = 5
+       Unpausing = 6
+       Suspending = 7
+       Resuming = 8
+       MigratePrep = 9
+       MigrateTrans = 10
+       ShuttingDown = 11
+       Destroying = 12
+       Orphaned = 13
+       Held = 14
+       Exited = 15
+       Suspended = 16
+
+class HostState(object):
+       Normal = 1
+       Drained = 2
+       VersionMismatch = 3
+
+class TashiException(Exception):
+       def __init__(self, d=None):
+               self.errno = None
+               self.msg = None
+               if isinstance(d, dict):
+                       if 'errno' in d:
+                               self.errno = d['errno']
+                       if 'msg' in d:
+                               self.msg = d['msg']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
+class Host(object):
+       def __init__(self, d=None):
+               self.id = None
+               self.name = None
+               self.up = None
+               self.decayed = None
+               self.state = None
+               self.memory = None
+               self.cores = None
+               self.version = None
+               if isinstance(d, dict):
+                       if 'id' in d:
+                               self.id = d['id']
+                       if 'name' in d:
+                               self.name = d['name']
+                       if 'up' in d:
+                               self.up = d['up']
+                       if 'decayed' in d:
+                               self.decayed = d['decayed']
+                       if 'state' in d:
+                               self.state = d['state']
+                       if 'memory' in d:
+                               self.memory = d['memory']
+                       if 'cores' in d:
+                               self.cores = d['cores']
+                       if 'version' in d:
+                               self.version = d['version']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
+class Network(object):
+       def __init__(self, d=None):
+               self.id = None
+               self.name = None
+               if isinstance(d, dict):
+                       if 'id' in d:
+                               self.id = d['id']
+                       if 'name' in d:
+                               self.name = d['name']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
+class User(object):
+       def __init__(self, d=None):
+               self.id = None
+               self.name = None
+               self.passwd = None
+               if isinstance(d, dict):
+                       if 'id' in d:
+                               self.id = d['id']
+                       if 'name' in d:
+                               self.name = d['name']
+                       if 'passwd' in d:
+                               self.passwd = d['passwd']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
+class DiskConfiguration(object):
+       def __init__(self, d=None):
+               self.uri = None
+               self.persistent = None
+               if isinstance(d, dict):
+                       if 'uri' in d:
+                               self.uri = d['uri']
+                       if 'persistent' in d:
+                               self.persistent = d['persistent']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
+class NetworkConfiguration(object):
+       def __init__(self, d=None):
+               self.network = None
+               self.mac = None
+               self.ip = None
+               if isinstance(d, dict):
+                       if 'network' in d:
+                               self.network = d['network']
+                       if 'mac' in d:
+                               self.mac = d['mac']
+                       if 'ip' in d:
+                               self.ip = d['ip']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
+class Instance(object):
+       def __init__(self, d=None):
+               self.id = None
+               self.vmId = None
+               self.hostId = None
+               self.decayed = None
+               self.state = None
+               self.userId = None
+               self.name = None
+               self.cores = None
+               self.memory = None
+               self.disks = None
+               #Quick fix so self.nics is not None
+               self.nics = []
+               self.hints = None
+               if isinstance(d, dict):
+                       if 'id' in d:
+                               self.id = d['id']
+                       if 'vmId' in d:
+                               self.vmId = d['vmId']
+                       if 'hostId' in d:
+                               self.hostId = d['hostId']
+                       if 'decayed' in d:
+                               self.decayed = d['decayed']
+                       if 'state' in d:
+                               self.state = d['state']
+                       if 'userId' in d:
+                               self.userId = d['userId']
+                       if 'name' in d:
+                               self.name = d['name']
+                       if 'cores' in d:
+                               self.cores = d['cores']
+                       if 'memory' in d:
+                               self.memory = d['memory']
+                       if 'disks' in d:
+                               self.disks = d['disks']
+                       if 'nics' in d:
+                               self.nics = d['nics']
+                       if 'hints' in d:
+                               self.hints = d['hints']
+
+       def __str__(self): 
+               return str(self.__dict__)
+
+       def __repr__(self): 
+               return repr(self.__dict__)
+
+       def __eq__(self, other):
+               return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+       def __ne__(self, other):
+               return not (self == other)
+
Index: src/tashi/util.py
===================================================================
--- src/tashi/util.py   (revision 792304)
+++ src/tashi/util.py   (working copy)
@@ -26,13 +26,11 @@
 import time
 import traceback
 import types
+import getpass
 
-from thrift.transport.TSocket import TServerSocket, TSocket
-from thrift.server.TServer import TThreadedServer
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from tashi.services import clustermanagerservice
-from tashi.services.ttypes import TashiException, Errors, InstanceState, 
HostState
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import TashiException, Errors, 
InstanceState, HostState
 
 def broken(oldFunc):
        """Decorator that is used to mark a function as temporarily broken"""
@@ -269,15 +267,20 @@
        host = os.getenv('TASHI_CM_HOST', cfgHost)
        port = os.getenv('TASHI_CM_PORT', cfgPort)
        timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
-       socket = TSocket(host, int(port))
-       socket.setTimeout(timeout)
-       transport = TBufferedTransport(socket)
-       protocol = TBinaryProtocol(transport)
-       client = clustermanagerservice.Client(protocol)
-       transport.open()
-       client._transport = transport
-       return (client, transport)
 
+       authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+       if authAndEncrypt:
+               username = config.get('AccessClusterManager', 'username')
+               if username == '':
+                       username = raw_input('Enter Username:')
+               password = config.get('AccessClusterManager', 'password')
+               if password == '':
+                       password = getpass.getpass('Enter Password:')
+               client = rpycservices.client(host, port, username=username, 
password=password)
+       else:
+               client = rpycservices.client(host, port)
+       return client
+
 def enumToStringDict(cls):
        d = {}
        for i in cls.__dict__:
Index: src/tashi/clustermanager/clustermanager.py
===================================================================
--- src/tashi/clustermanager/clustermanager.py  (revision 792304)
+++ src/tashi/clustermanager/clustermanager.py  (working copy)
@@ -24,27 +24,41 @@
 import logging.config
 from getopt import getopt, GetoptError
 from ConfigParser import ConfigParser
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
 
-from tashi.services import clustermanagerservice
 from tashi.util import signalHandler, boolean, instantiateImplementation, 
getConfig, debugConsole
 import tashi
 
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
+
 def startClusterManager(config):
        global service, data
        
        dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), 
config)
        data = instantiateImplementation(config.get("ClusterManager", "data"), 
config)
        service = instantiateImplementation(config.get("ClusterManager", 
"service"), config, data, dfs)
-       processor = clustermanagerservice.Processor(service)
-       transport = TServerSocket(int(config.get('ClusterManagerService', 
'port')))
-       server = TThreadedServer(processor, transport)
-       
+
+       if boolean(config.get("Security", "authAndEncrypt")):
+               users = {}
+               userDatabase = data.getUsers()
+               for user in userDatabase.values():
+                       if user.passwd != None:
+                               users[user.name] = user.passwd
+               users[config.get('AllowedUsers', 'nodeManagerUser')] = 
config.get('AllowedUsers', 'nodeManagerPassword')
+               users[config.get('AllowedUsers', 'agentUser')] = 
config.get('AllowedUsers', 'agentPassword')
+               authenticator = VdbAuthenticator.from_dict(users)
+               t = ThreadedServer(service=rpycservices.ManagerService, 
hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), 
auto_register=False, authenticator=authenticator)
+       else:
+               t = ThreadedServer(service=rpycservices.ManagerService, 
hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), 
auto_register=False)
+       t.logger.quiet = True
+       t.service.service = service
+       t.service._type = 'ClusterManagerService'
+
        debugConsole(globals())
        
        try:
-               server.serve()
+               t.start()
        except KeyboardInterrupt:
                handleSIGTERM(signal.SIGTERM, None)
 
Index: src/tashi/clustermanager/clustermanagerservice.py
===================================================================
--- src/tashi/clustermanager/clustermanagerservice.py   (revision 792304)
+++ src/tashi/clustermanager/clustermanagerservice.py   (working copy)
@@ -18,28 +18,28 @@
 from datetime import datetime
 from random import randint
 from socket import gethostname
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
 import logging
 import threading
 import time
 
-from tashi.services.ttypes import Errors, InstanceState, HostState, 
TashiException
-from tashi.services import nodemanagerservice
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, 
TashiException
 from tashi import boolean, convertExceptions, ConnectionManager, vmStates, 
timed, version, scrubString
 
-def RPC(oldFunc):
-       return convertExceptions(oldFunc)
-
 class ClusterManagerService(object):
        """RPC service for the ClusterManager"""
        
        def __init__(self, config, data, dfs):
                self.config = config
                self.data = data
+               self.authAndEncrypt = boolean(config.get('Security', 
'authAndEncrypt'))
+               if self.authAndEncrypt:
+                       self.username = config.get('AccessNodeManager', 
'username')
+                       self.password = config.get('AccessNodeManager', 
'password')
+               else:
+                       self.username = None
+                       self.password = None
+               self.proxy = ConnectionManager(self.username, self.password, 
int(self.config.get('ClusterManager', 'nodeManagerPort')))
                self.dfs = dfs
-               self.proxy = ConnectionManager(nodemanagerservice.Client, 
int(self.config.get('ClusterManager', 'nodeManagerPort')))
                self.convertExceptions = 
boolean(config.get('ClusterManagerService', 'convertExceptions'))
                self.log = logging.getLogger(__name__)
                self.lastContacted = {}
@@ -181,7 +181,6 @@
                                del instance.hints[hint]
                return instance
        
-       @RPC
        def createVm(self, instance):
                """Function to add a VM to the list of pending VMs"""
                instance = self.normalize(instance)
@@ -189,7 +188,6 @@
                self.data.releaseInstance(instance)
                return instance
        
-       @RPC
        def shutdownVm(self, instanceId):
                instance = self.data.acquireInstance(instanceId)
                self.stateTransition(instance, InstanceState.Running, 
InstanceState.ShuttingDown)
@@ -202,7 +200,6 @@
                        raise
                return
        
-       @RPC
        def destroyVm(self, instanceId):
                instance = self.data.acquireInstance(instanceId)
                if (instance.state is InstanceState.Pending or instance.state 
is InstanceState.Held):
@@ -221,7 +218,6 @@
                                raise
                return
        
-       @RPC
        def suspendVm(self, instanceId):
                instance = self.data.acquireInstance(instanceId)
                self.stateTransition(instance, InstanceState.Running, 
InstanceState.Suspending)
@@ -235,7 +231,6 @@
                        raise TashiException(d={'errno':Errors.UnableToSuspend, 
'msg':'Failed to suspend %s' % (instance.name)})
                return
        
-       @RPC
        def resumeVm(self, instanceId):
                instance = self.data.acquireInstance(instanceId)
                self.stateTransition(instance, InstanceState.Suspended, 
InstanceState.Pending)
@@ -244,7 +239,6 @@
                self.data.releaseInstance(instance)
                return instance
        
-       @RPC
        def migrateVm(self, instanceId, targetHostId):
                instance = self.data.acquireInstance(instanceId)
                try:
@@ -285,7 +279,6 @@
                        raise
                return
        
-       @RPC
        def pauseVm(self, instanceId):
                instance = self.data.acquireInstance(instanceId)
                self.stateTransition(instance, InstanceState.Running, 
InstanceState.Pausing)
@@ -301,7 +294,6 @@
                self.data.releaseInstance(instance)
                return
 
-       @RPC
        def unpauseVm(self, instanceId):
                instance = self.data.acquireInstance(instanceId)
                self.stateTransition(instance, InstanceState.Paused, 
InstanceState.Unpausing)
@@ -317,23 +309,18 @@
                self.data.releaseInstance(instance)
                return
        
-       @RPC
        def getHosts(self):
                return self.data.getHosts().values()
        
-       @RPC
        def getNetworks(self):
                return self.data.getNetworks().values()
        
-       @RPC
        def getUsers(self):
                return self.data.getUsers().values()
        
-       @RPC
        def getInstances(self):
                return self.data.getInstances().values()
        
-       @RPC
        def vmmSpecificCall(self, instanceId, arg):
                instance = self.data.getInstance(instanceId)
                hostname = self.data.getHost(instance.hostId).name
@@ -345,7 +332,6 @@
                return res
        
 #      @timed
-       @RPC
        def registerNodeManager(self, host, instances):
                """Called by the NM every so often as a keep-alive/state 
polling -- state changes here are NOT AUTHORITATIVE"""
                if (host.id == None):
@@ -408,7 +394,6 @@
                        self.data.releaseHost(oldHost)
                return host.id
        
-       @RPC
        def vmUpdate(self, instanceId, instance, oldState):
                try:
                        oldInstance = self.data.acquireInstance(instanceId)
@@ -450,7 +435,6 @@
                        self.data.releaseInstance(oldInstance)
                return
        
-       @RPC
        def activateVm(self, instanceId, host):
                dataHost = self.data.acquireHost(host.id)
                if (dataHost.name != host.name):
Index: src/tashi/clustermanager/data/fromconfig.py
===================================================================
--- src/tashi/clustermanager/data/fromconfig.py (revision 792304)
+++ src/tashi/clustermanager/data/fromconfig.py (working copy)
@@ -17,7 +17,7 @@
 
 import threading
 
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data import DataInterface
 
 class FromConfig(DataInterface):
@@ -128,7 +128,7 @@
                        self.releaseLock(host._lock)
        
        def getHosts(self):
-               return self.hosts
+               return self.cleanHosts()
        
        def getHost(self, id):
                host = self.hosts.get(id, None)
Index: src/tashi/clustermanager/data/pickled.py
===================================================================
--- src/tashi/clustermanager/data/pickled.py    (revision 792304)
+++ src/tashi/clustermanager/data/pickled.py    (working copy)
@@ -18,7 +18,7 @@
 import cPickle
 import os
 import threading
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data import FromConfig, DataInterface
 
 class Pickled(FromConfig):
Index: src/tashi/clustermanager/data/sql.py
===================================================================
--- src/tashi/clustermanager/data/sql.py        (revision 792304)
+++ src/tashi/clustermanager/data/sql.py        (working copy)
@@ -19,7 +19,7 @@
 import threading
 import time
 import types
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data.datainterface import DataInterface
 from tashi.util import stringPartition, boolean
 
@@ -74,7 +74,7 @@
                self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id 
int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, 
state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT 
NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) 
NOT NULL, hints varchar(1024) NOT NULL)")
                self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id 
INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, 
decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores 
int(11), version varchar(256))")
                self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id 
int(11) NOT NULL, name varchar(256) NOT NULL)")
-               self.executeStatement("CREATE TABLE IF NOT EXISTS users (id 
int(11) NOT NULL, name varchar(256) NOT NULL)")
+               self.executeStatement("CREATE TABLE IF NOT EXISTS users (id 
int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
        
        def sanitizeForSql(self, s):
                if (s == '"True"'):
@@ -247,12 +247,12 @@
                res = cur.fetchall()
                users = {}
                for r in res:
-                       user = User(d={'id':r[0], 'name':r[1]})
+                       user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
                        users[user.id] = user
                return users
        
        def getUser(self, id):
                cur = self.executeStatement("SELECT * FROM users WHERE id = %d" 
% (id))
                r = cur.fetchone()
-               user = User(d={'id':r[0], 'name':r[1]})
+               user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
                return user
Index: src/tashi/clustermanager/data/ldapoverride.py
===================================================================
--- src/tashi/clustermanager/data/ldapoverride.py       (revision 792304)
+++ src/tashi/clustermanager/data/ldapoverride.py       (working copy)
@@ -17,7 +17,7 @@
 
 import subprocess
 import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
 from tashi.clustermanager.data import DataInterface
 from tashi.util import instantiateImplementation
 
Index: src/tashi/clustermanager/data/getentoverride.py
===================================================================
--- src/tashi/clustermanager/data/getentoverride.py     (revision 792304)
+++ src/tashi/clustermanager/data/getentoverride.py     (working copy)
@@ -17,7 +17,7 @@
 
 import subprocess
 import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
 from tashi.clustermanager.data import DataInterface
 from tashi.util import instantiateImplementation
 
Index: src/tashi/client/tashi-client.py
===================================================================
--- src/tashi/client/tashi-client.py    (revision 792304)
+++ src/tashi/client/tashi-client.py    (working copy)
@@ -21,12 +21,7 @@
 import random
 import sys
 import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import *
 from tashi import vmStates, hostStates, boolean, getConfig, stringPartition, 
createClient
 
 users = {}
@@ -46,7 +41,10 @@
 
 def getUser():
        fetchUsers()
-       userStr = os.getenv("USER", "unknown")
+       if client.username != None:
+               userStr = client.username
+       else:
+               userStr = os.getenv("USER", "unknown")
        for user in users:
                if (users[user].name == userStr):
                        return users[user].id
@@ -66,7 +64,7 @@
                raise ValueError("Unknown instance %s" % (str(instance)))
        for instance in instances:
                if (instance.id == instanceId):
-                       if (instance.userId != userId and instance.userId != 
None):
+                       if (instance.userId != userId and instance.userId != 
None and userId != 0):
                                raise ValueError("You don't own that VM")
        return instanceId
 
@@ -461,7 +459,7 @@
                        usage(function)
        try:
                vals = {}
-               (client, transport) = createClient(config)
+               client = createClient(config)
                for parg in possibleArgs:
                        (parg, conv, default, required) = parg
                        val = None
@@ -498,8 +496,6 @@
                print "TashiException:"
                print e.msg
                exitCode = e.errno
-       except TTransportException, e:
-               print e
        except Exception, e:
                print e
                usage(function)
Index: etc/TashiDefaults.cfg
===================================================================
--- etc/TashiDefaults.cfg       (revision 792304)
+++ etc/TashiDefaults.cfg       (working copy)
@@ -15,6 +15,27 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+[Security]
+authAndEncrypt = False
+
+[AccessClusterManager]
+#If username and password are left empty, user will be prompted for username 
and password on the command line.
+username = nodemanager
+password = nodemanager
+
+[AccessNodeManager]
+#If username and password are left empty, user will be prompted for username 
and password on the command line.
+username = clustermanager
+password = clustermanager
+
+[AllowedUsers]
+nodeManagerUser = nodemanager
+nodeManagerPassword = nodemanager
+agentUser = agent
+agentPassword = agent
+clusterManagerUser = clustermanager
+clusterManagerPassword = clustermanager
+
 # ClusterManager portion
 [ClusterManager]
 service = tashi.clustermanager.ClusterManagerService

Reply via email to