Hi,
I want to post a mixin that can be used to turn any task into one that
runs its prerequisites in threads. The difference with MultiTask is that
MultiTask creates a separate thread per prerequisites which causes
thrashing if there are many (esp. for tasks that do IO, like compile tasks).
There are two files. The first is thread_pool.rb which provides a
general thread pool implementation. I've tried several implementations
and this is the only one that worked reliably for me. The second is the
mixin. (actually, in the patch the order is reversed)
The patch is attached.
HTH,
Ittay
--
--
Ittay Dror <[EMAIL PROTECTED]>
diff -ruN lib_old/jobtask.rb lib/jobtask.rb
--- lib_old/jobtask.rb 1970-01-01 02:00:00.000000000 +0200
+++ lib/jobtask.rb 2008-08-11 10:43:59.000000000 +0300
@@ -0,0 +1,47 @@
+require 'thread_pool'
+
+module JobTask
+ def threaded_invoke_prerequisites(args, invocation_chain, num)
+ pool = ThreadPool.new(num) do |e|
+ puts e
+ raise "Prerequisites failed. Stopping all"
+ end
+ begin
+ @prerequisites.each do |p|
+ pool.add_work(p) do |r|
+ application[r].invoke_with_call_chain(args, invocation_chain)
+ end
+ end
+ pool.no_more_work
+ pool.join
+ ensure
+ pool.no_more_work
+ pool.join
+ end
+ end
+
+ def num_jobs=(n)
+ @num_jobs = n
+ end
+
+ def num_jobs
+ unless instance_variables.include? :@num_jobs
+ if ENV['JOBS']
+ @num_jobs = Integer(ENV['JOBS'])
+ else
+ @num_jobs = 2
+ end
+ end
+ @num_jobs
+ end
+
+ def invoke_prerequisites(args, invocation_chain)
+ case num_jobs
+ when 1
+ super(args, invocation_chain)
+ else
+ #TODO: get number of cpus
+ threaded_invoke_prerequisites(args, invocation_chain, num_jobs)
+ end
+ end
+end
diff -ruN lib_old/thread_pool.rb lib/thread_pool.rb
--- lib_old/thread_pool.rb 1970-01-01 02:00:00.000000000 +0200
+++ lib/thread_pool.rb 2008-08-11 10:45:02.000000000 +0300
@@ -0,0 +1,69 @@
+# Real Ruby Thread Pool | Piotr Wlodarek 2007 | http://ruby-rails.pl | MIT licensed
+
+require 'thread'
+require 'pp'
+
+# http://en.wikipedia.org/wiki/Thread_pool_pattern
+class ThreadPool
+
+ # 1. Initialize with arbitrary number of threads
+ def initialize( threads_number = 1, &exception_handler )
+ @work_queue = Queue.new
+ @worker_threads = ThreadGroup.new
+
+ # Create all threads in advance: that's the thread pool after all
+ threads_number.times do
+
+ worker_thread = Thread.new do
+ # Body of the Worker Thread
+ begin
+ work_to_do = @work_queue.pop() # Blocking
+ break if work_to_do == :END_OF_WORK
+ begin
+ work_to_do.block.call( *work_to_do.args )
+ rescue Exception => e # Since they are swallowed by default
+ if !exception_handler.nil?
+ yield e
+ else
+ # Standard exception handling
+ dump = "Worker Thread thrown an exception:\n"
+ dump += "#{e.message}\n(#{e.class.name})\n"
+ dump += e.backtrace().join( "\n" ) + "\n"
+ print dump
+ end
+ end
+ end until false
+ # Worker thread ends up gracefully <img src='http://ruby-rails.pl/wp-includes/images/smilies/icon_smile.gif' alt=':)' class='wp-smiley' />
+ end
+
+ @worker_threads.add worker_thread
+ end
+ end
+
+ # 2. Add job to the queue, example:
+ def add_work( *args, &block )
+ @work_queue.push( Runnable.new( args, &block ) )
+ end
+
+ # 3. Allow working threads to end up after finishing all queued work
+ def no_more_work
+ threads_n = @worker_threads.list.length
+ threads_n.times { @work_queue.push :END_OF_WORK }
+ end
+
+ # 4. Wait for all work to finish
+ def join
+ @worker_threads.list.each { |t| t.join }
+ end
+
+end
+
+class Runnable
+ attr_accessor :args, :block
+ def initialize( args, &block )
+ @args = args
+ @block = block
+ end
+end
+
+
_______________________________________________
Rake-devel mailing list
[email protected]
http://rubyforge.org/mailman/listinfo/rake-devel