Daniel, Your post prompted me to go back and retest the broken code I had for r.horizon. I have fixed it and have attached a script that I think is somewhat general, unlike the previous one. There are several more issues I should mention: 1. Each process MUST run in its own mapset. Grass mapsets are not thread or process safe. The multiple instances will overwrite each other and create jibberish. 2. Use python scripts outside of grass so you can define the mapset when you create a separate process. 3. Manually spawn only enough processes as you have CPU cores available. After that, force your code to wait until that batch is completed. Once that set is done, spawn the next set. I did _not_ do this in the code I posted earlier. Multiprocessing with grass requires a lot of careful hand-holding to make it work. hopefully the following script is useful. Collin Bode Project Manager, Desktop Watershed Integrated Program National Center for Earth-surface Dynamics University of California, Berkeley Cell: 415-305-5346 Lab: 510-643-9294 |
#!/usr/bin/env python ############################################################################ # # MODULE: rhorizon_mp.py # AUTHOR: Collin Bode, UC Berkeley March, 2012 # # PURPOSE: Runs r.horizon using multiprocessing, one process per CPU core. # # SOURCE MAPS: bare-earth dem and canopy dem. all else is calculated from them. # # COPYRIGHT: (c) 2012 Collin Bode # This program is free software under the GNU General Public # License (>=v2). Read the file COPYING that comes with GRASS # for details. # #############################################################################
import sys import os import shutil import re import datetime as dt import multiprocessing as mp #gisbase = os.environ['GISBASE'] = "/usr/lib64/grass-6.4.2" # Grass 6.4.2 from RPM gisbase = os.environ['GISBASE'] = "/usr/local/grass-7.0.svn/" # Grass 7.0svn gisdbase = os.path.abspath("/home/collin/grass/") # <-- INSERT YOUR GRASS GISDBASE HERE sys.path.append(os.path.join(os.environ['GISBASE'], "etc", "python")) import grass.script as grass import grass.script.setup as gsetup # R.HORIZON PARAMETERS dem = 'sfk10mdem' # <-- INSERT YOUR ELEVATION MAP HERE, ASSUMED TO BE IN PERMANENT location = 'sfk10m' # Grass Location. mhorizon = 'horizon' # Grass Mapset. Horizon maps will be moved here. Can change to PERMANENT. maxdistance = '10000' # maxdistance = 10000 meters (longer than the diagnal of the map) hstep = 1 # horizonstep = 1 degree (causing 360 maps to be created) dist = '0.5' # normal range (0.5 - 1.5) previous runs used 0.3 ?artifacting? # dist=1.0 or greater uses simplified calculation that causes artifacts def worker_horizon(cpu,demh,azimuth): # Setup Grass Environment, including temporary mapset mtemp = 'temp'+str(cpu).zfill(2) gsetup.init(gisbase, gisdbase, location, mtemp) #grass.run_command("g.region",region="b5k") # map names prefhor = demh+'hor' demh = demh+'@PERMANENT' # run r.horizon for one azimuth angle azi = str(azimuth) grass.run_command("r.horizon", elevin=demh, \ direction=azi, maxdistance=maxdistance, \ horizon=prefhor, dist=dist, overwrite = 1) # rename horizon map to proper azimuth name az = str(azimuth).zfill(3) tempmap = prefhor+'_0' horizonmap = prefhor+'_'+az cmd = tempmap+","+horizonmap grass.run_command("g.rename",rast=cmd,overwrite=1) def create_temp(cores): gsetup.init(gisbase, gisdbase, location, 'PERMANENT') for count in range(0,cores,1): temp = 'temp'+str(count).zfill(2) temp_path = gisdbase+'/'+location+'/'+temp if(os.path.exists(temp_path) == False): grass.run_command("g.mapset",flags="c", mapset=temp, quiet=1) printout(temp+" mapset created.") else: printout(temp+" mapset already exists. skipping...") def remove_temp(cores): # setup is in the target mapset, not temp mapsets gsetup.init(gisbase, gisdbase, location, mhorizon) # Delete the temp mapset for count in range(0,cores): mapset_temp = 'temp'+str(count).zfill(2) grass.run_command("g.mapsets", removemapset=mapset_temp) temp_path = gisdbase+'/'+location+'/'+mapset_temp shutil.rmtree(temp_path) def copy_mapset(mapset_from,mapset_to,regfilter,overwrite): # list contents of temp mapset grass.run_command("g.mapset", mapset=mapset_from, quiet=1) raster_list = grass.list_pairs(type = 'rast') # Switch to target mapset and copy rasters over grass.run_command("g.mapset", mapset=mapset_to,quiet=1) for rast in raster_list: if(rast[1] != 'PERMANENT' and re.search(regfilter,rast[0])): old = rast[0]+ '@' + rast[1] new = rast[0] cmd = old+","+new grass.run_command("g.copy", rast=cmd, overwrite=overwrite) def printout(str_text): timestamp = dt.datetime.strftime(dt.datetime.now(),"%H:%M:%S") lf.write(timestamp+": "+str_text+'\n') print timestamp+": "+str_text def main(): ################################## # HORIZON CALCULATIONS ################################## global cores global lf cores = mp.cpu_count() # Open log file tlog = dt.datetime.strftime(dt.datetime.now(),"%Y-%m-%d_h%Hm%M") lf = open('rhorizon_'+tlog+'.log', 'w') # write params to log file printout("STARTING R.SUN MODELING RUN") printout("Location: "+location) printout("Mapset: "+mhorizon) printout("This computer has "+str(cores)+" CPU cores.") printout('dem: '+dem) printout('horizon mapset: '+mhorizon) printout('horizonstep: '+str(hstep)) printout('dist: '+dist) printout('maxdistance: '+maxdistance) printout('_________________________________') # start logging start = dt.datetime.now() starttime = dt.datetime.strftime(start,"%m-%d %H:%M:%S") printout('START r.horizon '+ starttime) # Create one temp directory for each CPU core printout("Creating Temporary directories, one per cpu core.") create_temp(cores) # Spawn process for r.horizon DEM jobs = [] c = 0 for az in range(300,361,hstep): if(c < cores): p = mp.Process(target=worker_horizon, args=(c,dem,az)) p.start() jobs.append(p) pid = str(p.pid) #printout("R.Horizon: dem = "+dem+" process id = "+pid) c += 1 else: # Wait for all the Processes to finish printout("R.Horizon. Cores maxxed out. Waiting for processes up to "+str(az)+" to finish.") c = 0 for p in jobs: pid = str(p.pid) p.join() #printout("R3. r.horizon. dem = "+demr+", pid = "+pid+" joined.") printout("R.Horizon finished for "+dem) # Copy all the files back over to horizon mapset printout("R.Horizon finished. copying files back to "+mhorizon) for cpu in range(0,cores): mtemp = 'temp'+str(cpu).zfill(2) suf = 'hor_' copy_mapset(mtemp, mhorizon, suf, 1) # Delete the temp mapsets remove_temp(cores) # Stop Logging and record total processing time. end = dt.datetime.now() endtime = dt.datetime.strftime(end,"%m-%d %H:%M:%S") processingtime = end - start printout('END r.horizon, processing time: '+str(processingtime)) lf.close() sys.exit("FINISHED.") if __name__ == "__main__": #options, flags = grass.parser() main()
On Apr 17, 2012, at 12:21 PM, Collin Bode wrote:
|
_______________________________________________ grass-user mailing list grass-user@lists.osgeo.org http://lists.osgeo.org/mailman/listinfo/grass-user