|
Hi, there are some good news from the parallel front. I have replaced the semaphores that control the fork() and join() off threads by a new, busy-wait based, mechanism. That has reduced the fork() and join() times considerably: from the 20,000 cycle ballpark to less than 1000 cycles (the numbers still vary a lot due to caching). As a consequence, the break-even point (where parallel is faster than sequential) for e.g. Z←A+B has dropped from ⍴,Z of some 1000s to some 50s. I will now start to integrate the Parallel.cc prototype (see attachment) into GNU APL... /// Jürgen |
#include <fstream> #include <iomanip> #include <iostream> #include <vector>
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
using namespace std;
// #define USE_SEMA
// #define WORK_LEN 100000
#define WORK_LEN 1
// #define VERBOSE tells you what is happening (and disables statistics)
//
// #define VERBOSE
#ifdef USE_SEMA
# define SL_INIT(sptr, val) sem_init(sptr, 0, val)
# define SL_POST(sptr) sem_post(sptr)
# define SL_WAIT(sptr) sem_wait(sptr)
# define SL_t sem_t
#else
# define SL_INIT(sptr, val) *sptr = val;
# define SL_POST(sptr) ++*sptr;
# define SL_WAIT(sptr) while (*sptr <= 0) ; --*sptr;
# define SL_t volatile int
#endif
#define HAVE_affinity_np
//-----------------------------------------------------------------------------
#ifdef VERBOSE
# define INFO(th, message, mil) \
sem_wait(&print_sema); \
cout << "thread #" << th << " says: " << message; \
if (mil != -1) cout << " (mileage=" << mil << ")"; \
cout << endl; \
sem_post(&print_sema);
#else
# define INFO(th, x, m)
#endif // VERBOSE
/// an upper bound on the number of cores, so that we can
/// define some data structures statically
enum
{
LOG_THREAD_LIMIT = 8,
THREAD_LIMIT = 1 << LOG_THREAD_LIMIT
};
/// a semaphore to coordinate printouts from several threads
sem_t print_sema;
/// a semaphore to coordinate thread creration
sem_t pthread_create_sema;
//-----------------------------------------------------------------------------
inline uint64_t
cycle_counter()
{
unsigned int lo, hi;
__asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
return ((uint64_t)hi << 32) | lo;
}
//-----------------------------------------------------------------------------
struct Thread_context
{
/// constructor
Thread_context()
: thread_id(this - thread_contexts)
{}
/// start parallel execution of work
void fork()
{
// wait until our master has moved forward
//
const int waiting_for_mileage = mileage + 1;
while (thread_contexts[join_thread].mileage != waiting_for_mileage) /* busy wait */;
INFO(thread_id, "forked", mileage)
// fork our sub-threads
//
++mileage;
INFO(thread_id,
forked_threads_count << " worker-thread(s) forked", mileage)
}
/// end parallel execution of work
inline void join();
/// the counter that controls forking and joining of threads
volatile int mileage;
// initialize all but thread and thread_id
void init(int thread_count, const vector<int> & cores);
void reset()
{ forked_threads_count = 0; }
const int thread_id;
int forked_threads[LOG_THREAD_LIMIT];
int forked_threads_count;
int join_thread;
pthread_t thread;
int CPU; // the core to which thread binds
static Thread_context thread_contexts[];
protected:
void add_forked(int peer)
{ forked_threads[forked_threads_count++] = peer; }
void add_joined(int peer)
{ join_thread = peer; }
} Thread_context::thread_contexts[THREAD_LIMIT];
// ----------------------------------------------------------------------------
void
Thread_context::join()
{
// wait for the threads that we forked
//
for (int f = 0; f < forked_threads_count; ++f)
{
// wait until sub thread f has moved forward
const int sub = forked_threads[f];
const Thread_context & sub_ctx = thread_contexts[sub];
const int waiting_for_mileage = mileage + 1;
while (sub_ctx.mileage != waiting_for_mileage) /* busy wait */ ;
INFO(thread_id, "worker-thread #" << sub << " (" << (f + 1) << " of "
<< forked_threads_count << ") has joined", mileage)
}
// inform our forker (if any) that we are done
//
if (thread_id) // not thread #0
{
Thread_context & forker = Thread_context::thread_contexts[join_thread];
++mileage;
INFO(thread_id, "joining thread #" << join_thread, mileage)
}
}
//-----------------------------------------------------------------------------
void do_work(int id, int mileage);
void Thread_context::init(int thread_count, const vector<int> & cores)
{
CPU = -1;
if (thread_id >= thread_count) return;
CPU = cores[thread_id % cores.size()];
for (int dist = THREAD_LIMIT >> 1; dist; dist >>= 1)
{
const int mask = dist - 1;
if (thread_id & mask) continue;
const int peer = thread_id ^ dist;
if (peer >= thread_count) continue;
if (thread_id & dist) continue;
// we fork peer and peer joins us.
this->add_forked(peer);
thread_contexts[peer].add_joined(thread_id);
}
mileage = 0;
if (thread_id < thread_count)
{
cerr << "thread #" << thread_id << " will start ";
if (forked_threads_count == 0) cerr << "no threads";
else if (forked_threads_count == 1) cerr << "1 thread";
else cerr << forked_threads_count << " threads";
for (int c = 0; c < forked_threads_count; ++c)
cerr << " #" << forked_threads[c];
if (thread_id)
{
cerr << " and will join thread #" << join_thread;
}
cerr << endl;
}
}
//-----------------------------------------------------------------------------
uint64_t work_start; ///< cycle counter before all threads forked
int64_t work_1[THREAD_LIMIT]; ///< cycle counters before individual work
int64_t work_2[THREAD_LIMIT]; ///< cycle counters after individual work
int dummy[THREAD_LIMIT] = { 0 };
uint64_t work_end; ///< cycle counter after all threads joined
void
do_work(int thread_id, int mileage)
{
INFO(thread_id, "start work", mileage)
work_1[thread_id] = cycle_counter();
for (int w = 0; w < WORK_LEN; ++w)
{
dummy[thread_id] = dummy[thread_id] * 333333 % 444444;
}
work_2[thread_id] = cycle_counter();
INFO(thread_id, "done work", mileage)
}
//-----------------------------------------------------------------------------
void *
pthread_main(void * arg)
{
Thread_context & ctx = *(Thread_context *)arg;
INFO(ctx.thread_id, "thread " << ctx.thread_id << " created", ctx.mileage);
sem_post(&pthread_create_sema);
for (;;)
{
ctx.fork();
do_work(ctx.thread_id, ctx.mileage);
ctx.join();
}
// not reached
//
return 0;
}
//-----------------------------------------------------------------------------
int
setup_threads(int thread_count, const vector<int> & cores)
{
// limit thread_count by THREAD_LIMIT
//
if (thread_count > THREAD_LIMIT) thread_count = THREAD_LIMIT;
// clear and initizlize all Thread_contexts
//
cerr << "\nInitializing thread contexts (thread_count = "
<< thread_count << ")..." << endl;
for (int c = 0; c < THREAD_LIMIT; ++c)
Thread_context::thread_contexts[c].reset();
for (int c = 0; c < THREAD_LIMIT; ++c)
Thread_context::thread_contexts[c].init(thread_count, cores);
// the main thread is #0 and we create worker-threads for #1 #2 ...
//
cerr << "\nCreating worker threads ..." << endl;
Thread_context::thread_contexts[0].thread = pthread_self();
for (int c = 1; c < thread_count; ++c)
{
Thread_context * ctx = Thread_context::thread_contexts + c;
pthread_create(&(ctx->thread), /* attr */ 0, pthread_main, ctx);
// wait until new thread has reached its loop
sem_wait(&pthread_create_sema);
}
#ifdef HAVE_affinity_np
// bind threads to cores
//
cerr << "\nBinding threads to cores..." << endl;
for (int c = 0; c < thread_count; ++c)
{
Thread_context * ctx = Thread_context::thread_contexts + c;
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(ctx->CPU, &cpus);
const int err = pthread_setaffinity_np(ctx->thread,
sizeof(cpu_set_t), &cpus);
if (err)
{
cerr << "pthread_setaffinity_np() failed with error "
<< err << endl;
exit(3);
}
cerr << "bound thread #" << c << " to core " << ctx->CPU << endl;
}
#endif // HAVE_affinity_np
return thread_count;
}
//-----------------------------------------------------------------------------
#ifdef HAVE_affinity_np
/// compute the number of cores avaiable
static int
setup_cores(vector<int> & cores, cpu_set_t & CPUs)
{
const int err = pthread_getaffinity_np(pthread_self(),
sizeof(cpu_set_t), &CPUs);
if (err)
{
cerr << "pthread_getaffinity_np() failed with error "
<< err << endl;
exit(2);
}
// get available CPUs (cores) but at most THREAD_LIMIT
//
for (int c = 0; c < THREAD_LIMIT; ++c)
{
if (CPU_ISSET(c, &CPUs)) cores.push_back(c);
}
cout << "\nThis machine has " << cores.size() << " cores:";
for (int c = 0; c < cores.size(); ++c) cout << " " << cores[c];
cout << endl;
return cores.size();
}
#else
#endif // HAVE_affinity_np
//-----------------------------------------------------------------------------
void
print_times(ostream & out, int thread_count, int pass)
{
// print execution statistics, unless VERBOSE is on (in which case that
// makes no sense.
//
#ifdef VERBOSE
out << endl << "No statistics because VERBOSE was #defined" << endl;
return;
#endif
out << " " << thread_count << " cores/threads "
<< (work_end - work_start) << " cycles total" << endl;
char filename[1000];
sprintf(filename, "c%d_p%d.data", thread_count, pass);
// figure when the first thread was finished. That is the time from
// where we measure the join time
//
int64_t work_2_min = work_2[0];
for (int c = 1; c < thread_count; ++c)
{
if (work_2_min > work_2[c]) (work_2_min = work_2[c]);
}
ofstream plot (filename);
for (int c = 0; c < thread_count; ++c)
{
plot << " " << setw(3) << c
<< ", " << (work_1[c] - work_start)
<< ", " << (work_2[c] - work_2_min) << endl;
}
}
//-----------------------------------------------------------------------------
int
main(int argc, const char * argv[])
{
if (argc == 3 && !strcmp(argv[1], "--seq2"))
{
// print 2 3 4 .... argv[2] (for portable makefile loop)
//
printf("2");
for (int j = 3; j <= atoi(argv[2]); ++j) printf(" %d", j);
printf("\n");
return 0;
}
int thread_count = 10; if (argc >= 2) thread_count = atoi(argv[1]);
vector<int> cores;
#ifdef HAVE_affinity_np
// determine the available cores and remember them
//
cpu_set_t CPUs;
setup_cores(cores, CPUs);
if (cores.size() > thread_count) cores.resize(thread_count);
#else
for (int t = 0; t < thread_count; ++t) cores.push_back(t);
#endif // HAVE_affinity_np
// setup for parallel execution. This is normally done only once (unless
// the number of cores/threads is changed.
//
sem_init(&print_sema, 0, 1);
sem_init(&pthread_create_sema, 0, 0);
thread_count = setup_threads(thread_count, cores);
cerr << endl;
// we run the loop several times so that we can see cache effects
//
for (int pass = 0; pass < 5; ++pass)
{
cout << "Pass " << pass
<< " --------------------------------------------" << endl;
work_start = cycle_counter();
{
// do the same as the workers, but without calling pthread_main().
// this is to avoid unneccessary tests for worker vs. master in
// pthread_main().
//
Thread_context & master = Thread_context::thread_contexts[0];
++master.mileage;
do_work(master.thread_id, master.mileage);
master.join();
++master.mileage;
}
work_end = cycle_counter();
#ifndef VERBOSE
print_times(cout, thread_count, pass);
#endif
}
}
//-----------------------------------------------------------------------------
#!/usr/bin/gnuplot
set term xterm
plot 'c4_p1.data' using 1:2 with lines lw 3 lc 1, \
'c4_p2.data' using 1:2 with lines lw 3 lc 2, \
'c4_p3.data' using 1:2 with lines lw 3 lc 3, \
'c4_p4.data' using 1:2 with lines lw 3 lc 4
pause -1
plot 'c4_p1.data' using 1:3 with lines lw 3 lc 5, \
'c4_p2.data' using 1:3 with lines lw 3 lc 6, \
'c4_p3.data' using 1:3 with lines lw 3 lc 7, \
'c4_p4.data' using 1:3 with lines lw 3 lc 8
pause -1
all: Parallel
Parallel: Parallel.cc
g++ -O2 -o $@ $< -lpthread
2 4 8 80: all
echo Running benchmarks for 1 ... $@ cores...
# for cnt in `./Parallel --seq2 $@` ; do \
# ./Parallel $$cnt ; done
./Parallel $@ ; done
tar czf [email protected] *.data
# rm -f cores_*
clean:
rm -f Parallel *.data result_*.tgz
