Hey Steven,
Yes thanks for this. I figured out that the initial location for nodes wasn't being set because of a bug in my python script driving the emulation. Essentially, I needed to send a location message to each slave explicitly after creating the session:


session.broker.handlerawmsg(location_conf_msg(session.location))

Where location_conf_msg is defined as:


def location_conf_msg(location):
        tlvdata = ""
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OBJ, "location") tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_TYPE, coreapi.CONF_TYPE_FLAGS_NONE) tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_DATA_TYPES,

(coreapi.CONF_DATA_TYPE_STRING,))
vals = [ location.refxyz[0], location.refxyz[1], location.refgeo[0], location.refgeo[1], location.refgeo[2], location.refscale ]
        vals = "|".join(map(str, vals))
tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES, vals)
        msg = coreapi.CoreConfMessage.pack(0, tlvdata)
        return msg

The full script is attached for anyone who is interested, although it contains a lot of extra stuff only relevant to my setup.
Dan

On 01/02/16 20:14, Steven Galgano wrote:
Dan,

Statistic tables are more appropriate for this situation:

  # emanesh n2 get table '*' phy

The following tables may prove useful:

  EventReceptionTable
  LocationEventInfoTable
  UnicastPacketDropTable0
  BroadcastPacketDropTable0

-steve


On 02/01/2016 03:07 PM, Dan O'Keeffe wrote:
Hey Steven,
Thanks for your reply. The output of "emanesh n2 get stat '*' phy" is
below, but I'm not quite sure which are related to location events. I
guess 'processedUpstreamControl/processedDownstreamControl'?

[emanesh (n2:47000)] ## get stat * phy
nem 1   phy  avgDownstreamProcessingDelay0 = 130.09324646
nem 1   phy  avgProcessAPIQueueDepth = 1.00073424808
nem 1   phy  avgProcessAPIQueueWait = 61.8266486164
nem 1   phy  avgTimedEventLatency = 0.0
nem 1   phy  avgTimedEventLatencyRatio = 0.0
nem 1   phy  avgUpstreamProcessingDelay0 = 146.843505859
nem 1   phy  numDownstreamBytesBroadcastGenerated0 = 0
nem 1   phy  numDownstreamBytesBroadcastRx0 = 997828
nem 1   phy  numDownstreamBytesBroadcastTx0 = 997828
nem 1   phy  numDownstreamBytesUnicastGenerated0 = 0
nem 1   phy  numDownstreamBytesUnicastRx0 = 0
nem 1   phy  numDownstreamBytesUnicastTx0 = 0
nem 1   phy  numDownstreamPacketsBroadcastDrop0 = 0
nem 1   phy  numDownstreamPacketsBroadcastGenerated0 = 0
nem 1   phy  numDownstreamPacketsBroadcastRx0 = 7372
nem 1   phy  numDownstreamPacketsBroadcastTx0 = 7372
nem 1   phy  numDownstreamPacketsUnicastDrop0 = 0
nem 1   phy  numDownstreamPacketsUnicastGenerated0 = 0
nem 1   phy  numDownstreamPacketsUnicastRx0 = 0
nem 1   phy  numDownstreamPacketsUnicastTx0 = 0
nem 1   phy  numUpstreamBytesBroadcastRx0 = 4565630
nem 1   phy  numUpstreamBytesBroadcastTx0 = 2990224
nem 1   phy  numUpstreamBytesUnicastRx0 = 0
nem 1   phy  numUpstreamBytesUnicastTx0 = 0
nem 1   phy  numUpstreamPacketsBroadcastDrop0 = 14124
nem 1   phy  numUpstreamPacketsBroadcastRx0 = 36202
nem 1   phy  numUpstreamPacketsBroadcastTx0 = 22078
nem 1   phy  numUpstreamPacketsUnicastDrop0 = 0
nem 1   phy  numUpstreamPacketsUnicastRx0 = 0
nem 1   phy  numUpstreamPacketsUnicastTx0 = 0
nem 1   phy  processedConfiguration = 0
nem 1   phy  processedDownstreamControl = 0
nem 1   phy  processedDownstreamPackets = 7372
nem 1   phy  processedEvents = 8
nem 1   phy  processedTimedEvents = 0
nem 1   phy  processedUpstreamControl = 0
nem 1   phy  processedUpstreamPackets = 36202


I'm a bit puzzled why this wouldn't just work out of the box for
CORE/EMANE, as I'm basically using the default configuration except that
I have created a separate multicast group (224.1.2.9) and control
interface (ctrl1 and eth1) for OTA traffic (as recommended by the CORE
docs).

Dan








On 01/02/16 19:39, Steven Galgano wrote:
Dan,

Your XML indicates you are using 2ray which requires location events:

   <param name="propagationmodel" value="2ray"/>

It is easier to debug emane using the control port. Connect to each
emulator instance and verify you are receiving location events using the
emulator physical layer statistic tables.

See the emane tutorial for more information on using the control port to
debug an emulation:

    https://github.com/adjacentlink/emane-tutorial/wiki

--
Steven Galgano
Adjacent Link LLC
www.adjacentlink.com


On 02/01/2016 02:29 PM, Dan O'Keeffe wrote:
Hi,
I'm having trouble getting a distributed CORE/EMANE emulation working
because of what I think is an EMANE problem. In particular, OLSR
broadcast packets received from remote containers are dropped by emane
with something like the following log message:

DEBUG PHYI 001 FrameworkPHY::processUpstreamPacket_i transmitter 4, src
4, dst 65535, drop propagation model missing info

I have 2 emulation servers in total, with 2 nodes running OLSR inside a
container on each (started using the CORE GUI).

Nodes on the same machine can communicate fine, but not nodes on
different machines.

Using tcpdump I can see that OLSR broadcast packets are reaching the
emane daemon running in the node containers on different machines.

Does anyone know what my problem might be or how to debug it further?

I'm running emane 0.9.2 and core 4.8. I've attached the emane configs
generated by core for one of the nodes in case that helps. The configs
are identical on the other machine except for nem and tap ids.

Thanks,
Dan


_______________________________________________
emane-users mailing list
[email protected]
http://pf.itd.nrl.navy.mil/mailman/listinfo/emane-users



#!/usr/bin/python

import sys,os,time,re,argparse,math,shutil,subprocess,socket


from core import pycore
from core.api import coreapi
from core.constants import *
from core.mobility import BasicRangeModel
from core.mobility import Ns2ScriptedMobility 
from core.emane.ieee80211abg import EmaneIeee80211abgModel
from core.emane.emane import EmaneGlobalModel
from core.misc.xmlsession import savesessionxml
from core.misc import ipaddr

from emanesh.events import EventService, LocationEvent

script_dir = os.path.dirname(os.path.realpath(__file__))

sys.path.append(script_dir)
from util import chmod_dir,pybool_to_javastr
from gen_mobility_trace import gen_trace
from gen_fixed_routes import create_static_routes


svc_dir='%s/vldb/myservices'%script_dir
conf_dir='%s/vldb/config'%script_dir
seep_jar = "seep-system-0.0.1-SNAPSHOT.jar"
mobility_params = [('file','%s/rwpt.ns_movements'%conf_dir),('refresh_ms',1000),
        ('loop',1),('autostart',1.0),('map',''),('script_start',''),('script_pause',''),('script_stop','')]


datacollect_template = '''#!/bin/bash
# session hook script; write commands here to execute on the host at the
# specified state


#echo "`hostname`:`pwd`" > /tmp/datacollect.log
#if [ -z "$SEEP_GITHUB_DIR" ]; then
#	echo "SEEP_GITHUB_DIR not set." >> /tmp/datacollect.log
#	SEEP_GITHUB_DIR=/data/dev/seep-github
#fi

#scriptDir=$SEEP_GITHUB_DIR/seep-system/examples/acita_demo_2015/core-emane
scriptDir=%s
timeStr=%s
k=%dk
mob=%.2fm
session=%ds
#resultsDir=$scriptDir/log/$timeStr
resultsDir=$scriptDir/log/$timeStr/$k/$mob/$session

expDir=$(pwd)

echo $expDir >> /tmp/datacollect.log
echo $scriptDir >> /tmp/datacollect.log
echo $timeStr >> /tmp/datacollect.log
echo $resultsDir >> /tmp/datacollect.log

mkdir -p $resultsDir

# Copy all log files to results dir
for d in n*.conf 
do
	cp $d/log1/*.log $resultsDir	
    if [ -e "$d/log2" ]
    then
        cp $d/log2/*.log $resultsDir	
    fi
	cp $d/mappingRecordOut.txt $resultsDir	
    mkdir $resultsDir/positions
	cp $d/*.xyz $resultsDir/positions	
	cp $d/mappingRecordOut.txt $scriptDir/log/$timeStr/session${session}MappingRecord.txt

    #Copy any emane stats
    if [ -d emane-stats ];
    then
        mkdir -p $resultsDir/emane-tables
        cp $d/emane-tables*.txt $resultsDir/emane-tables
    fi
done

#Copy mobility params if they exist
cp r_waypoints.params $resultsDir

#Copy dstat trace if exists 
cp dstat/*.csv $resultsDir

#Copy any emane stats
if [ -d emane-stats ];
then
	mkdir -p $resultsDir/emane-stats
	cp emane-stats/*.txt $resultsDir/emane-stats
fi

cd $scriptDir
#./gen_core_results.py --expDir log/$timeStr 
./gen_core_results.py --expDir $resultsDir
#./move_analysis.py --nodes 10 --expDir $resultsDir/positions
chmod -R go+rw $resultsDir
cd $expDir
'''
def run_sessions(time_str, k, mob, sessions, params):
    for session in sessions:
        print '*** Running session %d ***'%session
        run_session(time_str, k, mob, session, params)

def run_session(time_str, k, mob, exp_session, params):

    print 'params=',params

    distributed = bool(params['slave'])
    verbose = params['verbose'] 

    try:
        session_cfg = {'custom_services_dir':svc_dir, 'emane_log_level':'4',
                'verbose':"true" if verbose else "False", 
		'emane_event_monitor':"true" if params['emaneMobility'] else "false"} 
        if params['preserve']: session_cfg['preservedir'] = '1' 
        if distributed: 
            slave = params['slave']
            slaveip = socket.gethostbyname(slave)
            session_cfg['controlnet'] = "%s:172.16.1.0/24 %s:172.16.2.0/24"%(socket.gethostname(), slave)
            session_cfg['controlnet1'] = "%s:172.17.1.0/24 %s:172.17.2.0/24"%(socket.gethostname(), slave)
            session_cfg['controlnetif1'] = "eth4"
        else: 
            session_cfg['controlnet'] = "172.16.1.0/24"

        session = pycore.Session(cfg=session_cfg, persistent=True)

        session.master=True
        session.location.setrefgeo(47.5791667,-122.132322,2.00000)
        session.location.refscale = 100.0
        session.metadata.additem("canvas c1", "{name {Canvas1}} {wallpaper-style {upperleft}} {wallpaper {/home/dan/dev/seep-ita/seep-system/examples/acita_demo_2015/core-emane/vldb/config/sample1-bg-large.png}} {size {3000 3000}}")

        if distributed: 
            conf_msg = raw_emane_global_conf_msg(1)
            session.broker.handlerawmsg(conf_msg)
            session.confobj("emane", session, to_msg(conf_msg))

            remote_configure(session, slave, slaveip)
            session.broker.handlerawmsg(location_conf_msg(session.location))

        if params['sinkDisplay']:
            sink_display = start_query_sink_display("sinkDisplay.log", session.sessiondir, params)
            print 'Started query sink display ', sink_display 

        params['repoDir'] = get_repo_dir()

        #This is so broken, should find a better way...
        write_replication_factor(k, session.sessiondir)
        write_chain_length(params['h'], session.sessiondir)
        write_query_type(params['query'], session.sessiondir)
        write_extra_params(params, session.sessiondir)
        write_session_params(params, session.sessiondir)

        copy_seep_jar(session.sessiondir)
        trace_file = None
        if mob > 0.0:
            trace_params = dict(params)
            trace_params['h'] = mob + 1.0
            trace_params['l'] = mob - 1.0
            trace_params['o'] = mob
            trace_file = gen_trace(session.sessiondir, exp_session, trace_params)
            print 'Trace file=',trace_file


        model = params.get('model')
        tx_range = 410 #TODO: How to apply to emane?
        print 'Model=', model
        if model == "Emane":
            # Gives ping range of ~915m with 1:1 pixels to m and default 802.11
            # settings (2ray).
            session.cfg['emane_models'] = "RfPipe, Ieee80211abg, Bypass"
            session.emane.loadmodels()

            # set increasing Z coordinates
            wlan1 = session.addobj(cls = pycore.nodes.EmaneNode, name = "wlan1", objid=1, verbose=verbose)
            wlan1.setposition(x=80,y=50)
            if distributed: session.broker.handlerawmsg(wlan1.tonodemsg(flags=coreapi.CORE_API_ADD_FLAG|coreapi.CORE_API_STR_FLAG))

            names = EmaneIeee80211abgModel.getnames()
            values = list(EmaneIeee80211abgModel.getdefaultvalues())
            print 'Emane Model default names: %s'%(str(names))
            print 'Emane Model default values: %s'%(str(values))
            values[ names.index('mode') ] = '3'
            values[ names.index('retrylimit') ] = '0:7'
            values[ names.index('propagationmodel') ] = '2ray'
            values[ names.index('multicastrate') ] = '4'
            values[ names.index('unicastrate') ] = '4'
            values[ names.index('txpower') ] = '-10.0'
            values[ names.index('flowcontrolenable') ] = 'on'
            values[ names.index('flowcontroltokens') ] = '10'
            print 'Emane Model overridden values: %s'%(str(list(values)))
            if distributed:
                typeflags = coreapi.CONF_TYPE_FLAGS_UPDATE
                msg = EmaneIeee80211abgModel.toconfmsg(flags=0, nodenum=wlan1.objid,
		     typeflags=typeflags, values=values)
                session.broker.handlerawmsg(msg)

            session.emane.setconfig(wlan1.objid, EmaneIeee80211abgModel._name, values)

            if distributed:
                #No idea why I need to call this again here with a None node but seems I have to. 
                conf_msg = raw_emane_global_conf_msg(None)
                session.broker.handlerawmsg(conf_msg)
                session.confobj("emane", session, to_msg(conf_msg))

        elif model == "Basic" and not distributed:
            wlan1 = session.addobj(cls = pycore.nodes.WlanNode, name="wlan1",objid=1, verbose=verbose)
            wlan1.setposition(x=80,y=50)
            print 'Basic Range Model default values: %s'%(str(BasicRangeModel.getdefaultvalues()))
            model_cfg = list(BasicRangeModel.getdefaultvalues())
            model_cfg[0] = str(tx_range) #Similar to default effective emane range.
            model_cfg[1] = '11000' #Similar to default emane bandwidth.
            print 'Basic Range configured values: %s'%(str(model_cfg))
            wlan1.setmodel(BasicRangeModel, tuple(model_cfg))
        else:
            raise Exception("Unknown model/distributed: %s/%s"%(model,str(distributed)))


        if not add_to_server(session): 
            print 'Could not add to server'

        #Copy appropriate mapping constraints.
        exp_results_dir = '%s/log/%s'%(script_dir, time_str)
        session_constraints = '%s/session%dsMappingRecord.txt'%(exp_results_dir, exp_session)
        if os.path.exists(session_constraints):
            shutil.copy(session_constraints, '%s/mappingRecordIn.txt'%session.sessiondir)
        elif params['constraints']:
            session_constraints = '%s/static/%s'%(script_dir, params['constraints'])
            if not os.path.exists(session_constraints):
                raise Exception("Could not find sessions constraints: %s"%session_constraints)
            shutil.copy(session_constraints, '%s/mappingRecordIn.txt'%session.sessiondir)

        #TODO: Do I actually need the default route/multicast service?
        services_str = "IPForward|DefaultRoute|DefaultMulticastRoute|SSH"
        master_services = services_str
        if params['dstat']: master_services += "|dstat" 
 
        master = create_node(2, session, "%s|MeanderMaster"%master_services, wlan1,
                gen_grid_position(2+params['nodes'], params['nodes'] - 1), addinf=False, verbose=verbose)

        services_str += "|%s"%params['net-routing']
        if params['quagga']: services_str += "|zebra|vtysh"
        if params['emanestats']: services_str += "|EmaneStats"
        workers = []
        num_workers = get_num_workers(k, params)
        print 'num_workers=', num_workers
        placements = get_initial_placements(params['placement'], mob)
        print 'Initial placements=',placements
        if placements: 
            create_static_routes(placements, tx_range, session.sessiondir)

        print 'Creating workers.'
        for i in range(3,3+len(num_workers)):
            if placements:
                pos = placements[i]
            else:
                pos = gen_grid_position(i, params['nodes']-1)
            worker_services = "|".join(["MeanderWorker%d"%lwid for lwid in range(1, num_workers[i-3]+1)])
            if params['pcap']: worker_services += "|PcapSrc"
            workers.append(create_node(i, session, "%s|%s"%(services_str, worker_services), wlan1, pos, verbose=verbose)) 
       
        routers = []
        print 'Creating routers.'
        # Create auxiliary 'router' nodes if any left
        for i in range(3+len(num_workers), 1+params['nodes']):
            if placements:
                pos = placements[i]
            else:
                pos = gen_grid_position(i, params['nodes']-1)

            if distributed:
                routers.append(create_remote_node(i, session, slave, "%s"%services_str, wlan1, pos, verbose=verbose))
            else:
                routers.append(create_node(i, session, "%s"%services_str, wlan1, pos, verbose=verbose))

        if trace_file and not params['emaneMobility']:
            node_map = create_node_map(range(0,params['nodes']-2), workers+routers)
            print 'Node map=%s'%node_map
            mobility_params[4] = ('map', node_map)
            mobility_params[0] = ('file','%s/%s'%(session.sessiondir, trace_file))
            refresh_ms = int(params.get('refresh_ms', 1000))
            mobility_params[1] = ('refresh_ms', refresh_ms)
            mobility_params[2] = ('loop', 0)
            session.mobility.setconfig_keyvalues(wlan1.objid, 'ns2script', mobility_params)


        datacollect_hook = create_datacollect_hook(time_str, k, mob, exp_session) 
        session.sethook("hook:5","datacollect.sh",None,datacollect_hook)
        session.node_count="%d"%(params['nodes'])

        if params['saveconfig']:
            print 'Saving session config.'
            savesessionxml(session, '%s/session.xml'%session.sessiondir, '1.0')

        print 'Instantiating session ',exp_session, ' with CORE session id', session.sessionid
        session.instantiate()

        chmod_dir(session.sessiondir)
        for n in range(2,3+len(num_workers)):
            node_dir = '%s/n%d.conf'%(session.sessiondir,n)
            chmod_dir('%s/var.run/sshd'%node_dir, 0655)
            chmod_dir('%s/var.run.sshd'%node_dir, 0655)
            while not os.path.exists('%s/etc.ssh/ssh_host_rsa_key'%node_dir):
                time.sleep(1)
            os.chmod('%s/etc.ssh/ssh_host_rsa_key'%node_dir, 0700)

        print 'Waiting for a meander worker/master to terminate'
        watch_meander_services(session.sessiondir, map(lambda n: "n%d"%n,
            range(2,3 + sum(num_workers))))


        if model == "Emane" and params['emanestats']:
            record_emanesh_tables(session.sessiondir, range(3, params['nodes']+1), params)

        print 'Collecting data'
        session.datacollect()
        time.sleep(5)
        print 'Shutting down'

    finally:
        print 'Shutting down session.'
        if session:
            if distributed: remote_shutdown(session)
            session.shutdown()
            if 'server' in globals():
                print 'Removing session from core daemon server'
                server.delsession(session)

        if params['sinkDisplay'] and sink_display:
            print 'Shutting down query sink display ', sink_display
            sink_display.stdin.close()
            sink_display.terminate()


def remote_configure(session, slave, slaveip):
	session.broker.addserver(slave, slaveip, coreapi.CORE_API_PORT)
	session.broker.setupserver(slave)
	session.setstate(coreapi.CORE_EVENT_CONFIGURATION_STATE)
	tlvdata = coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
					coreapi.CORE_EVENT_CONFIGURATION_STATE)
	session.broker.handlerawmsg(coreapi.CoreEventMessage.pack(0, tlvdata))

def remote_instantiate(session):
	tlvdata = coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
					coreapi.CORE_EVENT_INSTANTIATION_STATE)
	msg = coreapi.CoreEventMessage.pack(0, tlvdata)
	session.broker.handlerawmsg(msg)

def remote_shutdown(session):
	tlvdata = coreapi.CoreEventTlv.pack(coreapi.CORE_TLV_EVENT_TYPE,
					coreapi.CORE_EVENT_SHUTDOWN_STATE)
	msg = coreapi.CoreEventMessage.pack(0, tlvdata)
	session.broker.handlerawmsg(msg)

def raw_emane_global_conf_msg(wlanid):
	global_names = EmaneGlobalModel.getnames()
	global_values = list(EmaneGlobalModel.getdefaultvalues())
	global_values[ global_names.index('otamanagerdevice') ] = 'ctrl1'
	global_values[ global_names.index('otamanagergroup') ] = '224.1.2.9:45702'
	msg = EmaneGlobalModel.toconfmsg(flags=0, nodenum=wlanid,
				     typeflags=coreapi.CONF_TYPE_FLAGS_NONE, values=global_values)
	print 'Created emane conf msg=%s'%str(msg)
	return msg

def to_msg(raw_msg):
        hdr = raw_msg[:coreapi.CoreMessage.hdrsiz]
        msgtype, flags, msglen = coreapi.CoreMessage.unpackhdr(hdr)
        msgcls = coreapi.msg_class(msgtype)
        return msgcls(flags, hdr, raw_msg[coreapi.CoreMessage.hdrsiz:])

def location_conf_msg(location):
	tlvdata = ""
	tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_OBJ, "location")
        tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_TYPE, coreapi.CONF_TYPE_FLAGS_NONE)
        tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_DATA_TYPES,
                                            (coreapi.CONF_DATA_TYPE_STRING,))
	vals = [ location.refxyz[0], location.refxyz[1], location.refgeo[0], location.refgeo[1], location.refgeo[2], location.refscale ]
	vals = "|".join(map(str, vals))
        tlvdata += coreapi.CoreConfTlv.pack(coreapi.CORE_TLV_CONF_VALUES, vals)
        msg = coreapi.CoreConfMessage.pack(0, tlvdata)
	return msg

def get_num_workers(k, params):
    q = params['query']
    sink_scale_factor = k if params['pyScaleOutSinks'] else 1
    if q == 'chain' or q == 'fr' or q == 'join': 
        num_workers = [1] * (1 + sink_scale_factor + (k * params['h']))
        if params['query'] == 'join':
            if params['h'] != 1: raise Exception('Only support query of height 1 for join')
            num_workers.append(1)
    elif q == 'debsGC13':
        if k > 2 or h > 1: raise Exception('Only support replication factors <= 2 for debs_gc_13') 
        num_workers = [1] * 23 
        if h > 0:
            num_workers[1] += 1 #A RB
            num_workers[5] += 1 #A RM
            num_workers[9] += 1 #A RF
            num_workers[12] += 1 #B RB
            num_workers[16] += 1 #B RM
            num_workers[20] += 1 #B RF
            if k > 1:
                num_workers[2] += 1 #A RCB
                num_workers[6] += 1 #A RCM
                num_workers[10] += 1 #A LF
                num_workers[13] += 1 #B RCB
                num_workers[17] += 1 #B RCM
                num_workers[21] += 1 #B LF

    elif q == 'nameAssist':
        num_workers = [2]+([1]*(1+ (k*3))) 
    elif q == 'heatMap':
        sources =  int(params['sources'])
        sinks = int(params['sinks'])
        fan_in = int(params['fanin'])
        height = int(math.ceil(math.log(sources, fan_in)))
        children = sources
        join_ops = 0 
        for i in range(0, height):
            parents = children / fan_in
            if children % fan_in > 0: parents += 1
            join_ops += parents
            children = parents
        print 'height=%d, join_ops=%d'%(height, join_ops)
        worker_nodes = params['nodes'] - 2
        if worker_nodes >= sources + k*(join_ops) + (sink_scale_factor * sinks):
            num_workers = [1] * (sources + k*(join_ops) + (sink_scale_factor * sinks))
        else:
            #Need to have multiple workers on some nodes.
            #N.B. Don't want to colocate replicas of the
            #same operator or the source.
            raise Exception("TODO")
    else: 
        raise Exception('Unknown query type: %s'%q)

    return num_workers

def create_node(i, session, services_str, wlan, pos, ip_offset=-1, addinf=True, verbose=False):
    tstart = time.time() 
    n = session.addobj(cls = pycore.nodes.CoreNode, name="n%d"%i, objid=i)
    taddobj = time.time() - tstart
    n.setposition(x=pos[0], y=pos[1])
    session.services.addservicestonode(n, "", services_str, verbose=verbose)
    taddservices = time.time() - tstart
    if addinf:
        ip = i + ip_offset 
        n.newnetif(net=wlan, addrlist=["10.0.0.%d/24"%(ip)], ifindex=0)
        taddnetif = time.time() - tstart
        n.cmd([SYSCTL_BIN, "net.ipv4.icmp_echo_ignore_broadcasts=0"])
        tcmd = time.time() - tstart
        print 'taddobj=%.3f,taddserv=%.3f,taddnet=%.3f,tcmd=%.3f'%(taddobj,taddservices,taddnetif,tcmd)
        print 'Created node n%d (10.0.0.%d) with initial pos=(%.1f,%.1f)'%(i,ip,pos[0], pos[1])
    else:
        print 'Created node n%d (no inf) with initial pos=(%.1f,%.1f)'%(i,pos[0], pos[1])
    return n

def create_remote_node(i, session, slave, services_str, wlan, pos, ip_offset=-1, addinf=True, verbose=False):
        n = pycore.nodes.CoreNode(session = session, objid = i,
                                    name = "n%d" % i, start=False)
        n.setposition(x=pos[0],y=pos[1])
        n.server = slave
        session.services.addservicestonode(n, "", services_str, verbose=verbose)
        session.broker.handlerawmsg(n.tonodemsg(flags=coreapi.CORE_API_ADD_FLAG | coreapi.CORE_API_STR_FLAG))

	prefix = ipaddr.IPv4Prefix("10.0.0.0/24")
	ip = i + ip_offset 
        tlvdata = coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_N1NUMBER,
                                           wlan.objid)
        tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_N2NUMBER, i)
        tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_TYPE,
                                            coreapi.CORE_LINK_WIRED)
        tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_IF2NUM, 0)
        tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_IF2IP4,
                                            prefix.addr(i+ip_offset))
                                            #"10.0.0.%d"%ip)
        tlvdata += coreapi.CoreLinkTlv.pack(coreapi.CORE_TLV_LINK_IF2IP4MASK,
                                            prefix.prefixlen)
        msg = coreapi.CoreLinkMessage.pack(coreapi.CORE_API_ADD_FLAG, tlvdata)
        session.broker.handlerawmsg(msg)

def create_node_map(ns_nums, nodes):
    print 'ns_nums=%s'%str(ns_nums)
    print 'nodes=%s'%str(nodes)
    if len(ns_nums) != len(nodes): 
        raise Exception("Invalid node mapping, %d != %d"%(len(ns_nums), len(nodes)))
    return ",".join(map(lambda (ns_num, node) : "%d:%d"%(ns_num,node.objid), zip(ns_nums, nodes)))

def get_initial_placements(placements, mobility):
    if not placements or mobility > 0.0:
        return None
    else:
        result = {}
        placements_path = '%s/static/%s'%(script_dir, placements)
        with open(placements_path, 'r') as pf:
            for line in pf:
                els = map(int, line.split(','))
                result[els[0]] = (els[1], els[2])

        return result

def gen_linear_position(i):
    return (50 * i, 100)

def gen_grid_position(i, nodes, offset=3, spacing=400):
    if i < offset: raise Exception("Invalid offset for %d: %d"%(i,offset))
    dim = max(1.0, math.floor(math.sqrt(nodes)))
    num_x = (i-offset) % dim 
    num_y = math.floor((i-offset) / dim)
    print 'i=',i, 'nodes=',nodes,'offset=',offset,'dim=',dim,'num_x=',num_x,'num_y=',num_y
    return (int(spacing * num_x), int(spacing * num_y)) 

def add_to_server(session):
    global server
    try:
        server.addsession(session)
        return True
    except NameError:
        print 'Name error'
        return False

def create_datacollect_hook(time_str, k, mob, exp_session):
    print 'Script dir = %s'%script_dir
    return datacollect_template%(script_dir, time_str, k, mob, exp_session)

def watch_meander_services(sessiondir, node_names):
    while True:
        for name in node_names:
            for process in ['worker1', 'worker2', 'master']:
                if os.path.exists("%s/%s.conf/%s.shutdown"%(sessiondir, name, process)):
		    print 'Shutdown file exists for node %s (%s) - exiting'%(name, process)
		    return

        time.sleep(0.5)

def write_replication_factor(k, session_dir):
    with open('%s/k.txt'%session_dir, 'w') as f:
        f.write(str(k))

def write_chain_length(h, session_dir):
    with open('%s/h.txt'%session_dir, 'w') as f:
        f.write(str(h))

def write_query_type(query, session_dir):
    with open('%s/query.txt'%session_dir, 'w') as f:
        f.write(str(query))

def write_extra_params(params, session_dir):
    with open('%s/extra_params.txt'%session_dir, 'w') as f:
        f.write('sources=%s\n'%str(params['sources']))
        f.write('sinks=%s\n'%str(params['sinks']))
        f.write('fanin=%s\n'%str(params['fanin']))

def write_session_params(params, session_dir):
    with open('%s/session_params.txt'%session_dir, 'w') as f:
        for k in params:
            f.write('%s=%s\n'%(k,str(params[k])))

def copy_seep_jar(session_dir):
    dest = '%s/lib'%session_dir
    os.mkdir(dest)
    shutil.copy('%s/../lib/%s'%(script_dir,seep_jar), dest)


def regen_sessions(time_str):
    raise Exception("TODO")

def start_query_sink_display(logfile, logdir, params):

    if params['query'] == 'fr':   
        args = ['java', 'FaceRecognitionDemo']
        cwd = script_dir 
    elif params['query'] == 'heatMap':   
        args = ['java', 'AcitaDemo']
        cwd = script_dir + '/heatMap'
    else: return None

    #os.mkdir(logdir)
    with open(logdir + "/" + logfile, 'w') as log:
        p = subprocess.Popen(args, stdout=log, cwd=cwd, stderr=subprocess.STDOUT, env=os.environ.copy())

    return p

def get_repo_dir():
    regex = re.compile('(.*)/seep-system')
    return re.search(regex, script_dir).groups()[0]

def record_emanesh_tables(sessiondir, nodes, params):
    for node in nodes:
        args = ['/usr/sbin/vcmd', '-c', '%s/n%d'%(sessiondir, node), './record-emane-tables.sh', str(node)]
        with open('%s/n%d.conf/emane-tables-n%d.txt'%(sessiondir, node, node), 'w') as log:
            p = subprocess.Popen(args, stdout=log, cwd='%s/n%d.conf'%(sessiondir,node), stderr=subprocess.STDOUT, env=os.environ.copy())
            p.wait()

if __name__ == "__main__" or __name__ == "__builtin__":
    parser = argparse.ArgumentParser(description='Run several meander experiments on CORE')
    parser.add_argument('--k', dest='k', default='2', help='replication factors (2)')
    parser.add_argument('--h', dest='h', default='2', help='chain length (2)')
    parser.add_argument('--x', dest='x', default='1200', help='Grid x dimension (1200)')
    parser.add_argument('--y', dest='y', default='1200', help='Grid y dimension (1200)')
    parser.add_argument('--duration', dest='duration', default='100000', help='Mobility params duration')
    parser.add_argument('--query', dest='query', default='chain', help='query type: (chain), join')
    parser.add_argument('--sources', dest='sources', default='1', help='Sources')
    parser.add_argument('--sinks', dest='sinks', default='1', help='Sinks (non-replicated)')
    parser.add_argument('--fanin', dest='fanin', default='2', help='Join fan-in')
    parser.add_argument('--pausetime', dest='pt', default='5.0', help='pause time (5.0)')
    parser.add_argument('--sessions', dest='sessions', default='1', help='number of sessions to run')
    parser.add_argument('--specific', dest='specific', default=False, action='store_true', help='Run a specific session')
    parser.add_argument('--plotOnly', dest='plot_time_str', default=None, help='time_str of run to plot (hh-mm-DDDddmmyy)[None]')
    parser.add_argument('--nodes', dest='nodes', default='10', help='Total number of core nodes in network')
    parser.add_argument('--model', dest='model', default="Emane", help='Wireless model (Basic, Emane)')
    parser.add_argument('--routing', dest='routing', default='OLSRETX',
            help='Net layer routing alg (OLSR, OLSRETX, OSPFv3MDR)')
    parser.add_argument('--preserve', dest='preserve', default=False, action='store_true', help='Preserve session directories')
    parser.add_argument('--saveconfig', dest='saveconfig', default=False, action='store_true', help='Export the session configuration to an XML file')
    parser.add_argument('--constraints', dest='constraints', default='', help='Export the session configuration to an XML file')
    parser.add_argument('--placement', dest='placement', default='', help='Explicit static topology to use for all sessions')
    parser.add_argument('--iperf', dest='iperf', default=False, action='store_true', help='Do an iperf test')
    parser.add_argument('--iperfcxns', dest='iperfcxns', default=None, help='Do an iperf test')
    parser.add_argument('--scaleSinks', dest='scale_sinks', default=False, action='store_true', help='Replicate sinks k times')
    parser.add_argument('--quagga', dest='quagga', default=False, action='store_true', help='Start quagga services (zebra, vtysh)')
    parser.add_argument('--pcap', dest='pcap', default=False, action='store_true', help='Start pcap service for workers.')
    parser.add_argument('--emanestats', dest='emanestats', default=False, action='store_true', help='Start emanestats service on master')
    parser.add_argument('--dstat', dest='dstat', default=False, action='store_true', help='Start dstat service on master.')
    parser.add_argument('--duplex', dest='duplex', default=False, action='store_true', help='Send in both directions for iperf tests')
    parser.add_argument('--verbose', dest='verbose', action='store_true', default=False, help='Verbose core logging')
    parser.add_argument('--sinkDisplay', dest='sink_display', default=False, action='store_true', help='Start a sink display for query output')
    parser.add_argument('--gui', dest='gui', default=False, action='store_true', help='Show placements in core GUI')
    parser.add_argument('--slave', dest='slave', default=None, help='Hostname of slave')
    parser.add_argument('--emaneMobility', dest='emane_mobility', default=False, action='store_true', help='Use emane location events for mobility (instead of ns2)')
    args=parser.parse_args()

    k=int(args.k)
    pt=float(args.pt)
    params = {'nodes':int(args.nodes)}
    if args.model: params['model']=args.model
    params['net-routing']=args.routing
    params['specific']=args.specific
    params['preserve']=args.preserve
    params['h']=int(args.h)
    params['x']=int(args.x)
    params['y']=int(args.y)
    params['duration']=args.duration
    params['query']=args.query
    params['saveconfig']=args.saveconfig
    params['constraints']=args.constraints
    params['placement']=args.placement
    params['sources']=args.sources
    params['sinks']=args.sinks
    params['fanin']=args.fanin
    params['iperf']=args.iperf
    params['iperfcxns']=args.iperfcxns
    params['pyScaleOutSinks']=args.scale_sinks
    params['scaleOutSinks']=pybool_to_javastr(args.scale_sinks)
    params['quagga']=args.quagga
    params['pcap']=args.pcap
    params['emanestats']=args.emanestats
    params['dstat']=args.dstat
    params['duplex']=args.duplex
    params['sinkDisplay']=args.sink_display
    params['enableSinkDisplay']=pybool_to_javastr(args.sink_display)
    params['enableGUI']= "true" if args.gui else "false"
    params['slave']= args.slave 
    params['verbose']= args.verbose 
    params['emaneMobility']= args.emane_mobility

    sessions = int(args.sessions)
    session_ids = [sessions] if args.specific else range(0,sessions)
    if args.plot_time_str:
        time_str = args.plot_time_str
        regen_sessions(time_str)
    else:
        time_str = time.strftime('%H-%M-%S-%a%d%m%y')
        run_sessions(time_str, k, pt, session_ids,params)
_______________________________________________
emane-users mailing list
[email protected]
http://pf.itd.nrl.navy.mil/mailman/listinfo/emane-users

Reply via email to