Changed default number of threads to be 1. If you specify --max-jobs
without a value, you get one thread per core. --max-jobs=N means use N
threads.

With regard to comparing the output with known good maps to see if the
parallel processing is corrupting anything, one problem is that the
files contain timestamps. I have test code that zeros the time stamps
and have been able to compare the output from different runs.

What I have seen is that sometimes there are differences that appear
to be due to the order in which the labels are written to the output
file. If only the order is changing that is harmless but it would be
nice to understand how it's happening (I have a theory about this, yet
to be proven).

---------

Now preserves order in which files are combined (thanks Steve for the
tweak).

---------

Now serialises reading of style files and map source to avoid
reentrancy issue in GType.

Reworked top-level loop that waits for the parallel jobs to complete.
Appears to use a lot less CPU and could possibly influence the weird
problems some were reporting on Windows/Mac - please retest with this
version.

Steve, I haven't incorporated your changed options handling stuff yet
but will do in the future if (a) you don't commit it separately and (b)
we can fix the reliability issues with this parallelisation code.

---------

Now respects --num-jobs again (broken in last patch).

---------

Now reports exceptions in the worker threads.

---------

Here's a better fix than last night's effort for the problem where the
mapname and description for each job were getting clobbered due to the
way that the command args are processed. Each job now gets a "snapshot"
of the command args so it doesn't matter if they subsequently get
changed.

---------

Whoops! fixed a bad bug whereby each map was being output to the same
file. Not sure if the fix is very elegant but at least it's not being
silly any more.

Now limits the default value of max-jobs to 4 no matter how many cores
you have as further testing shows that having more threads just burns
CPU cycles but doesn't actually finish any quicker. I guess the memory
system is limiting the performance and the CPUs are spinning waiting
for access.

Now showing a real speedup of around 240% (my earlier higher claim
was based on CPU usage and I now realise that was erroneous, sorry).

--------

Now defaults to creating a thread per core so without doing anything
you should see a speedup on a SMP box when processing multiple maps.

You can use --max-jobs=N to limit the concurrency - you may
want to specify that if you can't increase the VM size to what is
required. However, it occurs to me that if you can afford a box with
more than 2 cores, then you can probably afford a reasonable amount of
memory (otherwise, what's the point in having more cores?)

Added help blurb.

--------

OK, let it not be said that I don't listen to others!

The attached patch provides support for making maps in parallel. By
default, the behaviour is the same as before but if you specify
--num-threads=N where N is greater than 1, it will process N maps at
the same time and then combine the results (if required). Don't forget
to increase the heap size appropriately.

A quick test on the big box shows good speedup - specifying
--num-threads=4 and 2GB VM size. I  was seeing better than 380%
utilisation with 8 cores in use.

I suspect the performance limitation here will be VM size and memory
system bandwidth.

BTW - I don't think num-threads is actually the best name for the
option, so please suggest alternatives.

Cheers,

Mark
diff --git a/resources/help/en/options b/resources/help/en/options
index 348272e..d64f0f8 100644
--- a/resources/help/en/options
+++ b/resources/help/en/options
@@ -101,6 +101,13 @@ Product description options:
 --overview-mapname
 
 Misc options:
+--max-jobs[=number]
+	When number is specified, allow that number of maps to be
+	processed concurrently. If number is not specified, the limit
+	is set equal to the number of CPU cores. If this option is not
+	given at all, the limit is 1 (i.e. the maps are processed
+	sequentially).
+
 --block-size=''number''
 	Changes the block size that is used in the generated map.
 	There is no general reason why you would want to do this.
diff --git a/src/uk/me/parabola/mkgmap/CommandArgs.java b/src/uk/me/parabola/mkgmap/CommandArgs.java
index c761737..66b404e 100644
--- a/src/uk/me/parabola/mkgmap/CommandArgs.java
+++ b/src/uk/me/parabola/mkgmap/CommandArgs.java
@@ -43,9 +43,17 @@ import uk.me.parabola.util.EnhancedProperties;
 public class CommandArgs {
 	private static final Logger log = Logger.getLogger(CommandArgs.class);
 
-	private final ArgList arglist = new ArgList();
+	private final ArgList arglist;
 
-	{
+	private final ArgumentProcessor proc;
+	private final EnhancedProperties currentOptions;
+
+	private boolean mapnameWasSet;
+
+	public CommandArgs(ArgumentProcessor proc) {
+		this.proc = proc;
+		currentOptions = new EnhancedProperties();
+		arglist = new ArgList();
 		// Set some default values.  It is as if these were on the command
 		// line before any user supplied options.
 		arglist.add(new CommandOption("mapname", "63240001"));
@@ -53,13 +61,14 @@ public class CommandArgs {
 		arglist.add(new CommandOption("overview-mapname", "63240000"));
 	}
 
-	private final ArgumentProcessor proc;
-	private final EnhancedProperties currentOptions = new EnhancedProperties();
-
-	private boolean mapnameWasSet;
-
-	public CommandArgs(ArgumentProcessor proc) {
-		this.proc = proc;
+	/*
+	 * Create a new CommandArgs that is a "snapshot" of an existing one
+	 */
+	public CommandArgs(CommandArgs args) {
+		proc = args.proc;
+		mapnameWasSet = args.mapnameWasSet;
+		currentOptions = (EnhancedProperties)args.currentOptions.clone();
+		arglist = new ArgList(args.arglist);
 	}
 
 	/**
@@ -263,10 +272,25 @@ public class CommandArgs {
 	 * The arguments are held in this list.
 	 */
 	private class ArgList implements Iterable<ArgType> {
-		private final List<ArgType> alist = new ArrayList<ArgType>();
+		private final List<ArgType> alist;
 
 		private int filenameCount;
 
+		public ArgList() {
+			alist = new ArrayList();
+		}
+
+		public ArgList(ArgList argList) {
+			List<ArgType>al;
+			try {
+				al = (List<ArgType>)argList.clone();
+			}
+			catch(CloneNotSupportedException cnse) {
+				al = new ArrayList();
+			}
+			this.alist = al;
+		}
+
 		public void add(CommandOption option) {
 			alist.add(option);
 		}
diff --git a/src/uk/me/parabola/mkgmap/main/Main.java b/src/uk/me/parabola/mkgmap/main/Main.java
index 6af2120..1b77467 100644
--- a/src/uk/me/parabola/mkgmap/main/Main.java
+++ b/src/uk/me/parabola/mkgmap/main/Main.java
@@ -25,10 +25,16 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
 import uk.me.parabola.imgfmt.ExitException;
 import uk.me.parabola.log.Logger;
 import uk.me.parabola.mkgmap.ArgumentProcessor;
@@ -60,13 +66,15 @@ public class Main implements ArgumentProcessor {
 	// Final .img file combiners.
 	private final List<Combiner> combiners = new ArrayList<Combiner>();
 
-	// The filenames that will be used in pass2.
-	private final List<String> filenames = new ArrayList<String>();
-
 	private final Map<String, MapProcessor> processMap = new HashMap<String, MapProcessor>();
 	private String styleFile = "classpath:styles";
 	private boolean verbose;
 
+	public List<Future<String>> futures = new LinkedList<Future<String>>();
+	public ExecutorService threadPool = null;
+	// default number of threads
+	public int maxJobs = 1;
+
 	/**
 	 * The main program to make or combine maps.  We now use a two pass process,
 	 * first going through the arguments and make any maps and collect names
@@ -142,14 +150,26 @@ public class Main implements ArgumentProcessor {
 	 * @param args The command arguments.
 	 * @param filename The filename to process.
 	 */
-	public void processFilename(CommandArgs args, String filename) {
-		String ext = extractExtension(filename);
+	public void processFilename(CommandArgs args, final String filename) {
+		final String ext = extractExtension(filename);
 		log.debug("file", filename, ", extension is", ext);
 
-		MapProcessor mp = mapMaker(ext);
-		String output = mp.makeMap(args, filename);
-		log.debug("adding output name", output);
-		filenames.add(output);
+		final MapProcessor mp = mapMaker(ext);
+		final CommandArgs jobArgs = new CommandArgs(args);
+
+		if(threadPool == null) {
+			log.info("Creating thread pool with " + maxJobs + " threads");
+			threadPool = Executors.newFixedThreadPool(maxJobs);
+		}
+
+		log.info("Submitting job " + filename);
+		futures.add(threadPool.submit(new Callable<String>() {
+				public String call() {
+					String output = mp.makeMap(jobArgs, filename);
+					log.debug("adding output name", output);
+					return output;
+				}
+			}));
 	}
 
 	private MapProcessor mapMaker(String ext) {
@@ -183,6 +203,15 @@ public class Main implements ArgumentProcessor {
 			verbose = true;
 		} else if (opt.equals("list-styles")) {
 			listStyles();
+		} else if (opt.equals("max-jobs")) {
+			if(val.length() > 0)
+				maxJobs = Integer.parseInt(val);
+			else
+				maxJobs = Runtime.getRuntime().availableProcessors();
+			if(maxJobs < 1) {
+				log.warn("max-jobs has to be at least 1");
+				maxJobs = 1;
+			}
 		} else if (opt.equals("version")) {
 			System.err.println(Version.VERSION);
 			System.exit(0);
@@ -248,9 +277,31 @@ public class Main implements ArgumentProcessor {
 	}
 
 	public void endOptions(CommandArgs args) {
+
+		List<String> filenames = new ArrayList<String>();
+
+		if(threadPool != null) {
+			threadPool.shutdown();
+			while(!futures.isEmpty()) {
+				try {
+					// don't call get() until a job has finished
+					if(futures.get(0).isDone())
+						filenames.add(futures.remove(0).get());
+					else
+						Thread.sleep(10);
+				}
+				catch(Exception e) {
+					log.error("" + e);
+					e.printStackTrace(System.err);
+				}
+			}
+		}
+
 		if (combiners.isEmpty())
 			return;
 
+		log.info("Combining maps");
+
 		// Get them all set up.
 		for (Combiner c : combiners)
 			c.init(args);
@@ -260,6 +311,7 @@ public class Main implements ArgumentProcessor {
 			if (file == null)
 				continue;
 			try {
+				log.info("  " + file);
 				FileInfo mapReader = FileInfo.getFileInfo(file);
 				for (Combiner c : combiners) {
 					c.onMapEnd(mapReader);
diff --git a/src/uk/me/parabola/mkgmap/main/MapMaker.java b/src/uk/me/parabola/mkgmap/main/MapMaker.java
index 60e24a3..982ccc1 100644
--- a/src/uk/me/parabola/mkgmap/main/MapMaker.java
+++ b/src/uk/me/parabola/mkgmap/main/MapMaker.java
@@ -79,6 +79,7 @@ public class MapMaker implements MapProcessor {
 		params.setBlockSize(args.getBlockSize());
 		params.setMapDescription(args.getDescription());
 
+		log.info("Started making", args.getMapname(), "(" + args.getDescription() + ")");
 		try {
 			Map map = Map.createMap(args.getMapname(), params);
 			setOptions(map, args);
@@ -132,9 +133,14 @@ public class MapMaker implements MapProcessor {
 	private LoadableMapDataSource loadFromFile(CommandArgs args, String name) throws
 			FileNotFoundException, FormatException
 	{
-		LoadableMapDataSource src = MapReader.createMapReader(name);
-		src.config(args.getProperties());
-		src.load(name);
+		LoadableMapDataSource src;
+		// work around non-reentrancy of GType priority stuff
+		// by serialising the map reading
+		synchronized(MapMaker.class) {
+			src = MapReader.createMapReader(name);
+			src.config(args.getProperties());
+			src.load(name);
+		}
 		return src;
 	}
 
_______________________________________________
mkgmap-dev mailing list
mkgmap-dev@lists.mkgmap.org.uk
http://www.mkgmap.org.uk/mailman/listinfo/mkgmap-dev

Reply via email to