Hello everyone,
I am new to python and trying to run an example code from mininet tests.
Basically, I am trying to call a method in Hcontroller.py from base class
Routing defined in DCRouting.py which runs and fetches all the required
results in install_reactive_path() method, but it returns None when it is
called from _GlobalFirstFit. I hope someone here could help me fix this
bug..
I am attaching all the three files(DCRouting.py, HController.py, util.py)
to have a look into. Thanks in advance for your time, help or suggestion.
Thanks a lot!
kind regards,
David
''' Simple hashed-based routing
@author: Milad Sharif ([email protected])
'''
import logging
from copy import copy
class Routing(object):
'''Base class for data center network routing.
Routing engines must implement the get_route() method.
'''
def __init__(self, topo):
'''Create Routing object.
@param topo Topo object from Net parent
'''
self.topo = topo
def get_route(self, src, dst, hash_):
'''Return flow path.
@param src source host
@param dst destination host
@param hash_ hash value
@return flow_path list of DPIDs to traverse (including hosts)
'''
raise NotImplementedError
def routes(self, src, dst):
''' Return list of paths
Only works for Fat-Tree topology
@ param src source host
@ param dst destination host
@ return list of DPIDs (including inputs)
'''
complete_paths = [] # List of complete dpid routes
src_paths = { src : [[src]] }
dst_paths = { dst : [[dst]] }
dst_layer = self.topo.layer(dst)
src_layer = self.topo.layer(src)
lower_layer = src_layer
if dst_layer > src_layer:
lower_layer = dst_layer
for front_layer in range(lower_layer-1, -1, -1):
if src_layer > front_layer:
# expand src frontier
new_src_paths = {}
for node in sorted(src_paths):
path_list = src_paths[node]
for path in path_list:
last_node = path[-1]
for frontier_node in self.topo.upper_nodes(last_node):
new_src_paths[frontier_node] = [path + [frontier_node]]
if frontier_node in dst_paths:
dst_path_list = dst_paths[frontier_node]
for dst_path in dst_path_list:
dst_path_copy = copy ( dst_path )
dst_path_copy.reverse()
complete_paths.append( path + dst_path_copy)
src_paths = new_src_paths
if dst_layer > front_layer:
# expand dst frontier
new_dst_paths = {}
for node in sorted(dst_paths):
path_list = dst_paths[node]
for path in path_list:
last_node = path[-1]
for frontier_node in self.topo.upper_nodes(last_node):
new_dst_paths[frontier_node] = [ path + [frontier_node]]
if frontier_node in src_paths:
src_path_list = src_paths[frontier_node]
dst_path_copy = copy( path )
dst_path_copy.reverse()
for src_path in src_path_list:
complete_paths.append( src_path + dst_path_copy)
dst_paths = new_dst_paths
if complete_paths:
return complete_paths
class HashedRouting(Routing):
''' Hashed routing '''
def __init__(self, topo):
self.topo = topo
def get_route(self, src, dst, hash_):
''' Return flow path. '''
if src == dst:
return [src]
paths = self.routes(src,dst)
if paths:
#print 'hash_:', hash_
choice = hash_ % len(paths)
#print 'choice:', choice
path = sorted(paths)[choice]
#print 'path:', path
return path
#!/usr/bin/python
'''
Fat tree topology for data center networking
@author Milad Sharif ([email protected])
'''
from mininet.topo import Topo
class FatTreeNode(object):
def __init__(self, pod = 0, sw = 0, host = 0, name = None, dpid = None):
''' Create FatTreeNode '''
if dpid:
self.pod = ( dpid & 0xff0000 ) >> 16
self.sw = ( dpid & 0xff00 ) >> 8
self.host = ( dpid & 0xff )
self.dpid = dpid
else:
if name:
pod, sw, host = [int(s) for s in name.split('h')]
self.pod = pod
self.sw = sw
self.host = host
self.dpid = (pod << 16) + (sw << 8) + host
def name_str(self):
''' Return name '''
return "%ih%ih%i" % (self.pod, self.sw, self.host)
def ip_str(self):
''' Return IP address '''
return "10.%i.%i.%i" % (self.pod, self.sw, self.host)
def mac_str(self):
''' Return MAC address '''
return "00:00:00:%02x:%02x:%02x" % (self.pod, self.sw, self.host)
class NonBlockingTopo(Topo):
LAYER_CORE = 0
LAYER_HOST = 3
def __init__(self, k=4):
''' Create a non-bloking switch '''
super(NonBlockingTopo, self).__init__()
self.k = k
self.node_gen = FatTreeNode
pods = range(0, k)
edge_sw = range(0, k/2)
agg_sw = range(k/2, k)
hosts = range(2, k/2+2)
core = self.node_gen(k, 1, 1)
core_opts = self.def_opts(core.name_str())
self.addSwitch(core.name_str(), **core_opts)
for p in pods:
for e in edge_sw:
for h in hosts:
host = self.node_gen(p,e,h)
host_opts = self.def_opts(host.name_str())
self.addHost(host.name_str(), **host_opts)
self.addLink(host.name_str(), core.name_str())
def layer(self, name):
''' Return the layer of a node '''
node = self.node_gen(name = name)
if (node.pod == self.k):
layer = self.LAYER_CORE
else:
layer = self.LAYER_HOST
return layer
def def_opts(self, name):
''' return default dict for FatTree node '''
node = self.node_gen(name = name)
d = {'layer': self.layer(name)}
if d['layer'] == self.LAYER_HOST:
d.update({'ip': node.ip_str()})
d.update({'mac': node.mac_str()})
d.update({'dpid': "%016x" % node.dpid})
return d
class FatTreeTopo(Topo):
LAYER_CORE = 0
LAYER_AGG = 1
LAYER_EDGE = 2
LAYER_HOST = 3
def __init__(self, k = 4):
''' Create FatTree topology
k : Number of pods (can support upto k^3/4 hosts)
'''
super(FatTreeTopo, self).__init__()
self.k = k
self.node_gen = FatTreeNode
self.numPods = k
self.aggPerPod = k / 2
pods = range(0, k)
edge_sw = range(0, k/2)
agg_sw = range(k/2, k)
core_sw = range(1, k/2+1)
hosts = range(2, k/2+2)
for p in pods:
for e in edge_sw:
edge = self.node_gen(p, e, 1)
edge_opts = self.def_opts(edge.name_str())
self.addSwitch(edge.name_str(), **edge_opts)
for h in hosts:
host = self.node_gen(p, e, h)
host_opts = self.def_opts(host.name_str())
self.addHost(host.name_str(), **host_opts)
self.addLink(edge.name_str(),host.name_str())
for a in agg_sw:
agg = self.node_gen(p, a, 1)
agg_opts = self.def_opts(agg.name_str())
self.addSwitch(agg.name_str(), **agg_opts)
self.addLink(agg.name_str(),edge.name_str())
for a in agg_sw:
agg = FatTreeNode(p, a, 1)
for c in core_sw:
core = self.node_gen(k, a-k/2+1, c)
core_opts = self.def_opts(core.name_str())
self.addSwitch(core.name_str(), **core_opts)
self.addLink(agg.name_str(),core.name_str())
def layer(self, name):
''' Return layer of node '''
node = self.node_gen(name = name)
if (node.pod == self.k):
layer = self.LAYER_CORE
elif (node.host == 1):
if (node.sw < self.k/2):
layer = self.LAYER_EDGE
else:
layer = self.LAYER_AGG
else:
layer = self.LAYER_HOST
return layer
def isPortUp(self, port):
if port > (self.k/2):
return True
else:
return False
def layer_nodes(self, layer):
''' Return nodes at the given layer '''
return [n for n in self.g.nodes() if self.layer(n) == layer]
def upper_nodes(self, name):
''' Return nodes at one layer higher(closer to core) '''
layer = self.layer(name) - 1
return [n for n in self.g[name] if self.layer(n) == layer]
def lower_nodes(self, name):
'''Return edges one layer lower (closer to hosts) '''
layer = self.layer(name) + 1
return [n for n in self.g[name] if self.layer(n) == layer]
def def_opts(self, name):
''' return default dict for FatTree node '''
node = self.node_gen(name = name)
d = {'layer': self.layer(name)}
if d['layer'] == self.LAYER_HOST:
d.update({'ip': node.ip_str()})
d.update({'mac': node.mac_str()})
d.update({'dpid': "%016x" % node.dpid})
return d
'''the demand estimation algorithm from Hedera paper is implemented here'''
import random
#import pdb
def demand_estimation(flows, hostsList):
M ={}
for i in hostsList:
M[i] = {}
for j in hostsList:
M[i][j] = {'demand': 0, 'demandInit': 0, 'converged' : False, 'FlowNmbr' : 0}
for flow in flows:
M[flow['src']][flow['dst']]['FlowNmbr'] += 1
print 'Moujo: ', M
demandChange = True
while demandChange:
demandChange = False
for src in hostsList:
Est_Src(M, flows, src)
for dst in hostsList:
Est_Dst(M, flows, dst)
for i in hostsList:
for j in hostsList:
if M[i][j]['demandInit'] != M[i][j]['demand']:
NoChange = True
M[i][j]['demandInit'] = M[i][j]['demand']
print"********************estimated demands*********************\n", demandsPrinting(M,hostsList)
return (M, flows)
def Est_Src(M, flows, src):
dF = 0
nU = 0
for flow in flows:
if flow['src'] == src:
if flow['converged']:
dF += flow['demand']
else:
nU += 1
if nU != 0:
eS = (1.0 - dF) / nU
for flow in flows:
if flow['src'] == src and not flow['converged']:
M[flow['src']][flow['dst']]['demand'] = eS
#pdb.set_trace()
flow['demand'] = eS
def Est_Dst(M, flows, dst):
dT = 0
dS = 0
nR = 0
for flow in flows:
if flow['dst'] == dst:
flow['recLimited'] = True
dT += flow['demand']
nR += 1
if dT <= 1.0:
return
eS = 1.0 / nR
flagFlip=True
while flagFlip:
flagFlip = False
nR = 0
for flow in flows:
if flow['dst'] == dst and flow['recLimited']:
if flow['demand'] < eS:
dS += flow['demand']
flow['recLimited'] = False
flagFlip = True
else:
nR += 1
eS = (1.0-dS)/nR
for flow in flows:
if flow['dst'] == dst and flow['recLimited']:
M[flow['src']][flow['dst']]['demand'] = eS
M[flow['src']][flow['dst']]['converged'] = True
flow['converged'] = True
flow['demand'] = eS
def demandsPrinting(M,hostsList):
print hostsList, '\n', '_'*80
for row in hostsList:
#pdb.set_trace()
print row,'|',
for col in hostsList:
temp = M[row][col]
print '%.2f' % temp['demand'],
print
def makeFlows(flows, src, dsts):
demand = 0.2 / len(dsts)
for dst in dsts:
flows.append({'converged': False, 'demand': demand, 'src': src, 'dst': dst, 'recLimited': False})
if __name__ == '__main__':
hostsList = range(15)
flows = []
for i in range(15):
dst = random.randint(0,14)
if dst > 6:
makeFlows(flows, i, [dst, dst])
else:
makeFlows(flows, i, [dst, dst+1])
#pdb.set_trace()
M, flows_estimated = demand_estimation(flows, hostsList)
demandsPrinting(M,hostsList)
#pdb.set_trace()
''' Hedera data center controller
@author: Behnam Montazeri ([email protected])
'''
import logging
import sys
sys.path.append('/home/juno/msharif-h/')
from struct import pack
from zlib import crc32
from pox.core import core
import pox.openflow.libopenflow_01 as of
from pox.lib.revent import EventMixin
from pox.lib.util import dpidToStr
from pox.lib.packet.ipv4 import ipv4
from pox.lib.packet.udp import udp
from pox.lib.packet.tcp import tcp
from util import buildTopo, getRouting
from DemandEstimation import demand_estimation
from threading import Timer, Lock
log = core.getLogger()
# Number of bytes to send for packet_ins
MISS_SEND_LEN = 2000
class Switch(EventMixin):
def __init__(self):
self.connection = None
self.dpid = None
self.ports = None
def connect(self, connection):
if self.dpid is None:
self.dpid = connection.dpid
assert self.dpid == connection.dpid
self.connection = connection
def send_packet_data(self, outport, data = None):
msg = of.ofp_packet_out(in_port=of.OFPP_NONE, data = data)
msg.actions.append(of.ofp_action_output(port = outport))
self.connection.send(msg)
def send_packet_bufid(self, outport, buffer_id = -1):
msg = of.ofp_packet_out(in_port=of.OFPP_NONE)
msg.actions.append(of.ofp_action_output(port = outport))
msg.buffer_id = buffer_id
self.connection.send(msg)
def install(self, port, match, buf = -1, deleteFlow=False, idle_timeout = 0 ):
msg = of.ofp_flow_mod()
msg.match = match
msg.idle_timeout = idle_timeout
msg.actions.append(of.ofp_action_output(port = port))
if deleteFlow:
msg.command = of.OFPFC_DELETE
#msg.buffer_id = buf
msg.flags = of.OFPFF_SEND_FLOW_REM
self.connection.send(msg)
class HController(EventMixin):
def __init__(self, t, r, bw):
self.switches = {} # [dpid]->switch
self.macTable = {} # [mac]->(dpid, port)
self.t = t # Topo object
self.r = r # Routng object
self.all_switches_up = False
core.openflow.addListeners(self)
self.statCntr = 0 #sanity check for the flow stats
self.HostNameList = [] #a dictionary of the host
self.hostsList = [] #list of host dpid
self.flows = [] #list of the collected stats
self.bw = bw
self.beReservation = {} #reservation for the elephant flows
self.statMonitorLock = Lock() #to lock the multi access threads
self.statMonitorLock.acquire()
statMonitorTimer = Timer(10.0,self._collectFlowStats()) #timer to collect stats
statMonitorTimer.start()
self.matchDict = {} # dictioanary of the matches
def _ecmp_hash(self, packet):
''' Return an ECMP-style 5-tuple hash for TCP/IP packets, otherwise 0.
RFC2992 '''
hash_input = [0] * 5
if isinstance(packet.next, ipv4):
ip = packet.next
hash_input[0] = ip.srcip.toUnsigned()
hash_input[1] = ip.dstip.toUnsigned()
hash_input[2] = ip.protocol
if isinstance(ip.next, tcp) or isinstance(ip.next, udp):
l4 = ip.next
hash_input[3] = l4.srcport
hash_input[4] = l4.dstport
return crc32(pack('LLHHH', *hash_input))
return 0
def _flood(self, event):
''' Broadcast to every output port '''
packet = event.parsed
dpid = event.dpid
in_port = event.port
t = self.t
# Broadcast to every output port except the input on the input switch.
for sw_name in t.layer_nodes(t.LAYER_EDGE):
for host_name in t.lower_nodes(sw_name):
sw_port, host_port = t.port(sw_name, host_name)
sw = t.node_gen(name = sw_name).dpid
# Send packet out each non-input host port
if sw != dpid or (sw == dpid and in_port != sw_port):
self.switches[sw].send_packet_data(sw_port, event.data)
def _install_reactive_path(self, event, out_dpid, final_out_port, packet):
''' Install entries on route between two switches. '''
in_name = self.t.node_gen(dpid = event.dpid).name_str()
out_name = self.t.node_gen(dpid = out_dpid).name_str()
hash_ = self._ecmp_hash(packet)
route = self.r.get_route(in_name, out_name, hash_)
print "Route:",route
print '-'*80
if route == None:
print None, "route between", in_name, "and", out_name
return
match = of.ofp_match.from_packet(packet)
for i, node in enumerate(route):
node_dpid = self.t.node_gen(name = node).dpid
if i < len(route) - 1:
next_node = route[i + 1]
out_port, next_in_port = self.t.port(node, next_node)
else:
out_port = final_out_port
self.switches[node_dpid].install(out_port, match, idle_timeout = 10)
if isinstance(packet.next, of.ipv4) and isinstance(packet.next.next, of.tcp):
self.matchDict[(packet.next.srcip, packet.next.dstip, packet.next.next.srcport, packet.next.next.dstport)] = (route, match)
def _handle_PacketIn(self, event):
if not self.all_switches_up:
#log.info("Saw PacketIn before all switches were up - ignoring." )
return
packet = event.parsed
dpid = event.dpid
in_port = event.port
# Learn MAC address of the sender on every packet-in.
self.macTable[packet.src] = (dpid, in_port)
sw_name = self.t.node_gen(dpid = dpid).name_str()
#print "Sw:", sw_name, packet.src, packet.dst,"port", in_port, packet.dst.isMulticast(),"macTable", packet.dst in self.macTable
#print '-'*80
# Insert flow, deliver packet directly to destination.
if packet.dst in self.macTable:
out_dpid, out_port = self.macTable[packet.dst]
self._install_reactive_path(event, out_dpid, out_port, packet)
self.switches[out_dpid].send_packet_data(out_port, event.data)
else:
self._flood(event)
def _handle_ConnectionUp(self, event):
sw = self.switches.get(event.dpid)
sw_str = dpidToStr(event.dpid)
sw_name = self.t.node_gen(dpid = event.dpid).name_str()
if sw_name not in self.t.switches():
log.warn("Ignoring unknown switch %s" % sw_str)
return
#log.info("A new switch came up: %s", sw_str)
if sw is None:
log.info("Added a new switch %s" % sw_name)
sw = Switch()
self.switches[event.dpid] = sw
sw.connect(event.connection)
sw.connection.send(of.ofp_set_config(miss_send_len=MISS_SEND_LEN))
if len(self.switches)==len(self.t.switches()):
log.info("All of the switches are up")
self.all_switches_up = True
if self.statMonitorLock.locked():
self.statMonitorLock.release()
def _collectFlowStats(self):
log.info("attempt to capture STATS")
''' this function send the flow stat requests'''
if not self.statMonitorLock.locked():
# log.info("here it goes to monitor flow stats")
self.statMonitorLock.acquire()
self.statCntr = 0
self.flows = []
self.HostNameList = []
self.hostsList = []
for sw_name in self.t.layer_nodes(self.t.LAYER_EDGE):
sw_dpid = self.t.node_gen(name = sw_name).dpid
#print 'sw_dpid',sw_dpid ,'sw_name',sw_name
for port in range(1,self.t.k + 1):
if not self.t.isPortUp(port):
msg = of.ofp_stats_request()
msg.type = of.OFPST_FLOW
msg.body = of.ofp_flow_stats_request()
msg.body.out_port = port
self.switches[sw_dpid].connection.send(msg)
self.statCntr += 1
self.statMonitorLock.release()
statMonitorTimer = Timer(3.5, self._collectFlowStats)
statMonitorTimer.start()
def IP2name_dpid(self,IP):
IP = str(IP)
ten, p, e, h = (int(s) for s in IP.split('.'))
node_name = self.t.node_gen(p,e,h).name_str()
dpid_ = (p << 16) + (e << 8) + h
return (node_name, dpid_)
def _handle_FlowStatsReceived(self, event):
'''handle function for collected stats '''
# log.info( "flow stat collected, process begins")
#print 'event.stats', event.stats
self.statCntr -= 1
for stat in event.stats:
flowLivingTime = stat.duration_sec * 1e9 + stat.duration_nsec
if flowLivingTime <= 1:
flowLivingTime = 1
flowDemand = 8 * float(stat.byte_count) / flowLivingTime / self.bw
#print 'stat.match.in_port:', stat.match.in_port,'flow byte_count',stat.byte_count,'flowLivingTime:', flowLivingTime, 'flowDemand:', flowDemand, 'stat.match.scrIP:', stat.match.nw_src, 'stat.match.dstIP', stat.match.nw_dst
src_name, src = self.IP2name_dpid(stat.match.nw_src)
dst_name, dst = self.IP2name_dpid(stat.match.nw_dst)
#print 'src_name:',src_name,'dst_name:', dst_name,'src_dpid:', src,'dst_dpid:', dst
#print stat.match.nw_src, stat.match.nw_dst, stat.match.tp_src, stat.match.tp_dst
if flowDemand > 0.1:
if src not in self.hostsList:
self.hostsList.append(src)
self.HostNameList.append({'node_name':src_name, 'dpid':src})
if dst not in self.hostsList:
self.hostsList.append(dst)
self.HostNameList.append({'node_name':dst_name, 'dpid':dst})
self.flows.append({ 'demand': flowDemand, 'converged':False, 'src': src, 'dst': dst, 'recLimited': False, 'match': stat.match})
if self.statCntr == 0:
print "****flows processed, Estimating demands begins"
self._demandEstimator()
def _demandEstimator(self):
'''estimate the actual flow demands here'''
temp = self.flows
temp = sorted(temp, key=lambda temp:temp['src'])
self.flows = temp
self.bwReservation = {}
M, estFlows = demand_estimation(self.flows, sorted(self.hostsList))
for flow in estFlows:
demand = flow['demand']
if demand >= 0.1:
self._GlobalFirstFit(flow)
def _GlobalFirstFit(self,flow):
'''do the Hedera global first fit here'''
src_name = self.t.node_gen(dpid = flow['src']).name_str()
dst_name = self.t.node_gen(dpid = flow['dst']).name_str()
print 'Global Fisrt Fit for the elephant flow from ',src_name,'to', dst_name
paths = self.r.routes(src_name,dst_name)
print 'all routes found for the big flow:\n',paths
GFF_route = None
if paths == None:
return
else:
for path in paths:
fitCheck = True
for i in range(1,len(path)):
fitCheck = False
if self.bwReservation.has_key(path[i-1]) and self.bwReservation[path[i-1]].has_key(path[i]):
if self.bwReservation[path[i-1]][path[i]]['reserveDemand'] + flow['demand'] > 1 :
break
else:
#self.bwReservation[path[i-1]][path[i]]['reserveDemand'] += flow['demand']
fitCheck = True
else:
self.bwReservation[path[i-1]]={}
self.bwReservation[path[i-1]][path[i]]={'reserveDemand':0}
fitCheck = True
if fitCheck == True:
for i in range(1,len(path)):
self.bwReservation[path[i-1]][path[i]]['reserveDemand'] += flow['demand']
GFF_route = path
print "GFF route found:", path
break
if GFF_route != None:
"""install new GFF_path between source and destintaion"""
self. _install_GFF_path(GFF_route,flow['match'])
def _install_GFF_path(self,GFF_route, match):
'''installing the global first fit path here'''
flow_match = match
_route, match = self.matchDict[match.nw_src, match.nw_dst, match.tp_src, match.tp_dst]
if _route != GFF_route[1:-1] and not self.statMonitorLock.locked():
print "old route", _route
print "match info:", match.nw_src, match.nw_dst, match.tp_src, match.tp_dst
self.statMonitorLock.acquire()
''' Install entries on route between two switches. '''
route = GFF_route[1:-1]
print"GFF route to be installed between switches:", route
for i, node in enumerate(route):
node_dpid = self.t.node_gen(name = node).dpid
if i < len(route) - 1:
next_node = route[i + 1]
out_port, next_in_port = self.t.port(node, next_node)
else:
dpid_out, out_port = self.macTable[match.dl_dst]
#print 'out_dpid', dpid_out,self.t.node_gen(name = GFF_route[-1]).dpid
#print 'outPort', out_port
self.switches[node_dpid].install(out_port, match,idle_timeout = 10)
self.statMonitorLock.release()
self.matchDict[flow_match.nw_src, flow_match.nw_dst, flow_match.tp_src, flow_match.tp_dst] = (route, match)
print '_'*20
def launch(topo = None, routing = None, bw = None ):
#print topo
if not topo:
raise Exception ("Please specify the topology")
else:
t = buildTopo(topo)
r = getRouting(routing, t)
if bw == None:
bw = 10.0 #Mb/s
bw = float(bw/1000) #Gb/s
else:
bw = float(bw)/1000
core.registerNew(HController, t, r, bw)
log.info("** HController is running")
# utility functions
from DCTopo import FatTreeTopo
from mininet.util import makeNumeric
from DCRouting import HashedRouting, Routing
TOPOS = {'ft': FatTreeTopo}
ROUTING = {'ECMP' : HashedRouting}
def buildTopo(topo):
topo_name, topo_param = topo.split( ',' )
return TOPOS[topo_name](makeNumeric(topo_param))
def getRouting(routing, topo):
return ROUTING[routing](topo)
--
https://mail.python.org/mailman/listinfo/python-list