Forgot to mention: requires numpy and scipy. I'll work on getting rid of those requirements.

On Apr 14, 2009, at 3:33 PM, Jim Cipar wrote:

I've been working on the Tashi locality server. It exports a service that will give the hop count between any set of nodes that it knows about. This can be used by, e.g., an HDFS Client that is running in a VM, to determine which node to read data from. This case requires modifying the HDFS client code, but it is an example of how something like this could be useful.

The only call exported right now is:
list <list <float>> getHopCountMatrix(list <string>sources, list <string> destinations) This returns the complete hop count matrix between sources and destinations, with positive infinity for any nodes that are either unknown, or not connected. It gathers information about static network objects from a configuration file describing all adjacent things (nodes, switches ...) in the network. Each line of the file has one source, and any number of destinations. For instance, a simple cluster with one top level switch, and two switches each with two hosts might be configured like this:

toplevel switch1 switch2
switch1 toplevel host1 host2
switch2 toplevel host3 host4

All physical objects on the network are configured via the configuration file. Information about currently running Tashi VMs, is automatically gathered from the Tashi Cluster Manager, and these are added to the graph.

Lastly, there is a script ( src/utils/getLocality.py ) that can be used to contact this service. The script takes two lines of input on stdin: a whitespace separated list of sources, and a whitespace separated list of destinations. It then prints the distance matrix with a row for each source, and a column for each destination. It then waits for two more lines of input on stdin. It will continue until it receives an EOF on stdin.

Here's the patch:



Index: trunk/src/utils/getLocality.py
===================================================================
--- trunk/src/utils/getLocality.py      (revision 0)
+++ trunk/src/utils/getLocality.py      (revision 0)
@@ -0,0 +1,53 @@
+#!/usr/bin/python
+
+import sys
+import os
+from os import system
+
+import tashi.services.layoutlocality.localityservice as localityservice
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+
+from tashi.util import getConfig
+
+(config, configFiles) = getConfig(["Client"])
+host = config.get('LocalityService', 'host')
+port = int(config.get('LocalityService', 'port'))
+
+socket = TSocket.TSocket(host, port)
+transport = TTransport.TBufferedTransport(socket)
+protocol = TBinaryProtocol.TBinaryProtocol(transport)
+client = localityservice.Client(protocol)
+transport.open()
+
+
+while True:
+    line1 = "\n"
+    line2 = "\n"
+    while line1 != "":
+        line1 = sys.stdin.readline()
+        if line1 == "":
+            sys.exit(0)
+        if line1 != "\n":
+            break
+    line1 = line1.strip()
+    while line2 != "":
+        line2 = sys.stdin.readline()
+        if line2 == "":
+            sys.exit(0)
+        if line2 != "\n":
+            break
+    line2 = line2.strip()
+
+    sources = line1.split(" ")
+    destinations = line2.split(" ")
+
+    mat = client.getHopCountMatrix(sources, destinations)
+    for r in mat:
+        for c in r:
+            print '%f\t'%c,
+        print '\n',
+    print '\n',

Property changes on: trunk/src/utils/getLocality.py
___________________________________________________________________
Added: svn:executable
  + *

Index: trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
===================================================================
--- trunk/src/tashi/nodemanager/vmcontrol/xenpv.py      (revision 764908)
+++ trunk/src/tashi/nodemanager/vmcontrol/xenpv.py      (working copy)
@@ -156,9 +156,9 @@
                            image, macAddr, memory, cores):
                fn = os.path.join("/tmp", vmName)
                cfgstr = """
-# kernel="/boot/vmlinuz-2.6.24-17-xen"
-# ramdisk="/boot/initrd.img-2.6.24-17-xen"
-bootloader="/usr/bin/pygrub"
+kernel="/boot/vmlinuz-2.6.24-17-xen"
+ramdisk="/boot/initrd.img-2.6.24-17-xen"
+# bootloader="/usr/bin/pygrub"
disk=['tap:qcow:%s,xvda1,w']
vif = [ 'mac=%s' ]
# vif = ['ip=172.19.158.1']
Index: trunk/src/tashi/thrift/build.py
===================================================================
--- trunk/src/tashi/thrift/build.py     (revision 764908)
+++ trunk/src/tashi/thrift/build.py     (working copy)
@@ -35,6 +35,7 @@
                print 'Removing \'../messaging/messagingthrift\' directory...'
                shutil.rmtree('../messaging/messagingthrift')
        
+       
        print 'Generating Python code for \'services.thrift\'...'
        os.system('thrift --gen py:new_style services.thrift')
        
@@ -44,7 +45,12 @@
        print 'Generatign Python code for \'messagingthrift\'...'
        os.system('rm -rf gen-py')
        os.system('thrift --gen py messagingthrift.thrift')
-       
+
+ print 'Generating Python code for \'layoutlocality.thrift \'...'
+        os.system('thrift --gen py:new_style layoutlocality.thrift')
+ print 'Copying generated code to \'tashi.services\' package...' + shutil.copytree('gen-py/layoutlocality', '../services/ layoutlocality')
+
print 'Copying generated code to \'tashi.messaging.messagingthrift \' package...'
        shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
                        os.path.join('..', 'messaging', 'messagingthrift'))
Index: trunk/src/tashi/thrift/layoutlocality.thrift
===================================================================
--- trunk/src/tashi/thrift/layoutlocality.thrift        (revision 0)
+++ trunk/src/tashi/thrift/layoutlocality.thrift        (revision 0)
@@ -0,0 +1,26 @@
+struct BlockLocation {
+  list<string> hosts,           // hostnames of data nodes
+  list<i32> ports,              // ports for data nodes
+  list<string> names,           // hostname:port of data nodes
+  i64 blocknum,
+  i64 offset,
+  i64 length
+}
+
+struct Pathname {
+  string pathname
+}
+
+exception FileNotFoundException {
+  string message
+}
+
+service layoutservice {
+
+ list <BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 offset, 3:i64 length)
+                       throws (1:FileNotFoundException ouch),
+}
+
+service localityservice {
+ list <list<double>> getHopCountMatrix(1:list<string> sourceHosts, 2:list<string> destHosts),
+}
\ No newline at end of file
Index: trunk/src/tashi/agents/locality-server.py
===================================================================
--- trunk/src/tashi/agents/locality-server.py   (revision 0)
+++ trunk/src/tashi/agents/locality-server.py   (revision 0)
@@ -0,0 +1,213 @@
+from socket import gethostname
+import os
+import threading
+import time
+import socket
+
+from tashi.services.ttypes import *
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from tashi.services import clustermanagerservice
+from tashi.util import getConfig
+from tashi.parallel import *
+
+import tashi.services.layoutlocality.localityservice as localityservice
+
+from numpy import *
+from scipy import *
+
+cnames = {}
+def cannonicalName(hn):
+    try:
+        if cnames.has_key(hn):
+            return cnames[hn]
+        r = socket.gethostbyname_ex(hn)[0]
+        cnames[hn] = r
+        return r
+    except:
+        return hn
+
+
+def createClient(config):
+    host = config.get('Client', 'clusterManagerHost')
+    port = config.get('Client', 'clusterManagerPort')
+    print host, port
+ timeout = float(config.get('Client', 'clusterManagerTimeout')) * 1000.0
+
+    socket = TSocket.TSocket(host, int(port))
+    socket.setTimeout(timeout)
+    transport = TTransport.TBufferedTransport(socket)
+    protocol = TBinaryProtocol.TBinaryProtocol(transport)
+    client = clustermanagerservice.Client(protocol)
+    try:
+        transport.open()
+    except:
+        pass
+    return (client, transport)
+
+
+def genMul(A, B, add, mult):
+    '''generalized matrix multiplication'''
+    C = zeros((shape(A)[0], shape(B)[1]))
+    for i in range(shape(C)[0]):
+        for j in range(shape(C)[1]):
+            C[i,j] = add(mult(A[i,:], B[:,j]))
+    return C
+
+def addHost(graph, hostVals, host):
+    if not graph.has_key(host):
+        graph[host] = []
+    if not hostVals.has_key(host):
+        hostVals[host] = len(hostVals)
+
+def graphConnect(graph, h1, h2):
+    if not h1 in graph[h2]:
+        graph[h2].append(h1)
+    if not h2 in graph[h1]:
+        graph[h1].append(h2)
+
+def graphFromFile(fn = 'serverLayout', graph = {}, hostVals = {}):
+    f = open(fn)
+    for line in f.readlines():
+        line = line.split()
+        if len(line) < 1:
+            continue
+        server = cannonicalName(line[0].strip())
+
+        addHost(graph, hostVals, server)
+        for peer in line[1:]:
+            peer = cannonicalName(peer.strip())
+            addHost(graph, hostVals, peer)
+            graphConnect(graph, server, peer)
+    return graph, hostVals
+
+def graphFromTashi(client, transport, graph={}, hostVals={}):
+    print 'getting graph'
+    if not transport.isOpen():
+        transport.open()
+    hosts = client.getHosts()
+    instances = client.getInstances()
+    for instance in instances:
+ host = [cannonicalName(h.name) for h in hosts if h.id == instance.hostId]
+        if len(host) <1 :
+            print 'cant find vm host'
+            continue
+        host = host[0]
+        print 'host is ', host
+        addHost(graph, hostVals, host)
+        print 'added host'
+        vmhost = cannonicalName(instance.name)
+        addHost(graph, hostVals, vmhost)
+        print 'added vm'
+        graphConnect(graph, host, vmhost)
+        print 'connected'
+    print 'returning from graphFromTashi'
+    return graph, hostVals
+
+
+
+def graphToArray(graph, hostVals):
+    a = zeros((len(hostVals), len(hostVals)))
+    for host in graph.keys():
+        if not hostVals.has_key(host):
+            continue
+        a[hostVals[host], hostVals[host]] = 1
+        for peer in graph[host]:
+            if not hostVals.has_key(peer):
+                continue
+            a[hostVals[host], hostVals[peer]] = 1
+    a[a==0] = inf
+    for i in range(shape(a)[0]):
+        a[i,i]=0
+    return a
+
+def shortestPaths(graphArray):
+    a = graphArray
+    for i in range(math.ceil(math.log(shape(a)[0],2))):
+        a = genMul(a,a,min,plus)
+    return a
+
+def plus(A, B):
+    return A + B
+
+
+def getHopCountMatrix(sourceHosts, destHosts, array, hostVals):
+    a = zeros((len(sourceHosts), len(destHosts)))
+    a[a==0] = inf
+    for i in range(len(sourceHosts)):
+        sh = cannonicalName(sourceHosts[i])
+        shv = None
+        if hostVals.has_key(sh):
+            shv = hostVals[sh]
+        else:
+            print 'host not found', sh
+            continue
+        for j in range(len(destHosts)):
+            dh = cannonicalName(destHosts[j])
+            dhv = None
+            if hostVals.has_key(dh):
+                dhv = hostVals[dh]
+            else:
+                print 'dest not found', dh
+                continue
+            print sh, dh, i,j, shv, dhv, array[shv, dhv]
+            a[i,j] = array[shv, dhv]
+    return a
+
+
+class LocalityService:
+    def __init__(self):
+        (config, configFiles) = getConfig(["Agent"])
+        self.port = int(config.get('LocalityService', 'port'))
+        print 'Locality service on port %i' % self.port
+        self.processor = localityservice.Processor(self)
+        self.transport = TSocket.TServerSocket(self.port)
+        self.tfactory = TTransport.TBufferedTransportFactory()
+        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+        self.server = TServer.TThreadedServer(self.processor,
+                                              self.transport,
+                                              self.tfactory,
+                                              self.pfactory)
+
+        self.hostVals =[]
+        self.array = array([[]])
+        self.rtime = 0
+
+
+ self.fileName = os.path.expanduser(config.get("LocalityService", "staticLayout"))
+        (self.client, self.transport) = createClient(config)
+
+        self.server.serve()
+
+    @synchronizedmethod
+    def refresh(self):
+        if time.time() - self.rtime < 10:
+            return
+        g, self.hostVals = graphFromFile(self.fileName)
+        try:
+ g, self.hostVals = graphFromTashi(self.client, self.transport, g, self.hostVals)
+        except e:
+            print e
+            print 'could not get instance list from cluster manager'
+        print 'graph to array'
+        a = graphToArray(g, self.hostVals)
+        print 'calling shortest paths ', a.shape
+        self.array = shortestPaths(a)
+        print 'computed shortest paths'
+        print self.array
+        print self.hostVals
+    @synchronizedmethod
+    def getHopCountMatrix(self, sourceHosts, destHosts):
+        self.refresh()
+        print 'getting hop count matrix for', sourceHosts, destHosts
+ hcm = getHopCountMatrix(sourceHosts, destHosts, self.array, self.hostVals)
+        print hcm
+        return hcm
+
+
+ls = LocalityService()
Index: trunk/etc/TashiDefaults.cfg
===================================================================
--- trunk/etc/TashiDefaults.cfg (revision 764908)
+++ trunk/etc/TashiDefaults.cfg (working copy)
@@ -82,6 +82,11 @@
[Vfs]
prefix = /var/tmp/

+[LayoutService]
+host = layoutserver
+port = 9884
+staticLayout = /location/of/layout/file
+
# Client configuration
[Client]
clusterManagerHost = localhost


Reply via email to