Author: Armin Rigo <ar...@tunes.org> Branch: vmprof-review Changeset: r78755:f99230e64cbc Date: 2015-08-03 19:26 +0200 http://bitbucket.org/pypy/pypy/changeset/f99230e64cbc/
Log: Tweaks, and implementation of a wait-free, best-effort algorithm to handle multiple threads (some possibly in signals) that all try to write chunks of data to the same file diff --git a/rpython/rlib/rvmprof/cintf.py b/rpython/rlib/rvmprof/cintf.py --- a/rpython/rlib/rvmprof/cintf.py +++ b/rpython/rlib/rvmprof/cintf.py @@ -35,6 +35,9 @@ vmprof_disable = rffi.llexternal("rpython_vmprof_disable", [], rffi.INT, compilation_info=eci, save_err=rffi.RFFI_SAVE_ERRNO) +vmprof_write_buf = rffi.llexternal("rpython_vmprof_write_buf", + [rffi.CCHARP, rffi.LONG], + lltype.Void, compilation_info=eci) ## vmprof_register_virtual_function = rffi.llexternal( ## "vmprof_register_virtual_function", diff --git a/rpython/rlib/rvmprof/rvmprof.py b/rpython/rlib/rvmprof/rvmprof.py --- a/rpython/rlib/rvmprof/rvmprof.py +++ b/rpython/rlib/rvmprof/rvmprof.py @@ -8,6 +8,7 @@ from rpython.rtyper.lltypesystem import rffi MAX_CODES = 8000 +MAX_FUNC_NAME = 128 # ____________________________________________________________ @@ -29,7 +30,6 @@ def _cleanup_(self): self.is_enabled = False - self.ever_enabled = False self.fileno = -1 self._current_codes = None if sys.maxint == 2147483647: @@ -93,15 +93,13 @@ if not (1e-6 <= interval < 1.0): raise VMProfError("bad value for 'interval'") interval_usec = int(interval * 1000000.0) - # + + p_error = cintf.vmprof_init() + if p_error: + raise VMProfError(rffi.charp2str(p_error)) + self.fileno = fileno self._write_header(interval_usec) - if not self.ever_enabled: - if we_are_translated(): - p_error = cintf.vmprof_init() - if p_error: - raise VMProfError(rffi.charp2str(p_error)) - self.ever_enabled = True self._gather_all_code_objs() if we_are_translated(): # does not work untranslated @@ -126,6 +124,8 @@ raise VMProfError(os.strerror(rposix.get_saved_errno())) def _write_code_registration(self, uid, name): + if len(name) > MAX_FUNC_NAME: + name = name[:MAX_FUNC_NAME] b = self._current_codes if b is None: b = self._current_codes = StringBuilder() @@ -139,20 +139,7 @@ def _flush_codes(self): buf = self._current_codes.build() self._current_codes = None - self._carefully_write(buf) - - def _carefully_write(self, buf): - fd = self.fileno - assert fd >= 0 - if not buf: - return - cintf.vmprof_ignore_signals(True) - try: - while len(buf) > 0: - num = os.write(fd, buf) - buf = buf[num:] - finally: - cintf.vmprof_ignore_signals(False) + cintf.vmprof_write_buf(buf, len(buf)) def _write_header(self, interval_usec): b = StringBuilder() @@ -164,7 +151,8 @@ b.append('\x04') # interp name b.append(chr(len('pypy'))) b.append('pypy') - self._carefully_write(b.build()) + buf = b.build() + cintf.vmprof_write_buf(buf, len(buf)) def _write_long_to_string_builder(l, b): diff --git a/rpython/rlib/rvmprof/src/rvmprof.c b/rpython/rlib/rvmprof/src/rvmprof.c --- a/rpython/rlib/rvmprof/src/rvmprof.c +++ b/rpython/rlib/rvmprof/src/rvmprof.c @@ -1,3 +1,22 @@ +/* VMPROF + * + * statistical sampling profiler specifically designed to profile programs + * which run on a Virtual Machine and/or bytecode interpreter, such as Python, + * etc. + * + * The logic to dump the C stack traces is partly stolen from the code in + * gperftools. + * The file "getpc.h" has been entirely copied from gperftools. + * + * Tested only on gcc, linux, x86_64. + * + * Copyright (C) 2014-2015 + * Antonio Cuni - anto.c...@gmail.com + * Maciej Fijalkowski - fij...@gmail.com + * Armin Rigo - ar...@tunes.org + * + */ + #define _GNU_SOURCE 1 @@ -16,8 +35,6 @@ #endif -#include "rvmprof_getpc.h" -#include "rvmprof_base.h" #include <dlfcn.h> #include <assert.h> #include <pthread.h> @@ -28,6 +45,9 @@ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> +#include "rvmprof_getpc.h" +#include "rvmprof_unwind.h" +#include "rvmprof_mt.h" /************************************************************/ @@ -57,6 +77,8 @@ if (!(unw_step = dlsym(libhandle, "_ULx86_64_step"))) goto error; } + if (prepare_concurrent_bufs() < 0) + return "out of memory"; return NULL; error: @@ -86,6 +108,9 @@ * ************************************************************* */ +#define MAX_FUNC_NAME 128 +#define MAX_STACK_DEPTH ((SINGLE_BUF_SIZE / sizeof(void *)) - 4) + #define MARKER_STACKTRACE '\x01' #define MARKER_VIRTUAL_IP '\x02' #define MARKER_TRAILER '\x03' @@ -94,32 +119,26 @@ static long profile_interval_usec = 0; static char atfork_hook_installed = 0; -static int _write_all(const void *buf, size_t bufsize) -{ - while (bufsize > 0) { - ssize_t count = write(profile_file, buf, bufsize); - if (count <= 0) - return -1; /* failed */ - buf += count; - bufsize -= count; - } - return 0; -} static void sigprof_handler(int sig_nr, siginfo_t* info, void *ucontext) { + if (ignore_signals) + return; int saved_errno = errno; - /* +#if 0 void* stack[MAX_STACK_DEPTH]; stack[0] = GetPC((ucontext_t*)ucontext); - int depth = frame_forcer(get_stack_trace(stack+1, MAX_STACK_DEPTH-1, ucontext)); + int depth = get_stack_trace(stack+1, MAX_STACK_DEPTH-1, ucontext); depth++; // To account for pc value in stack[0]; prof_write_stacktrace(stack, depth, 1); - */ +#endif errno = saved_errno; } -/************************************************************/ +/* ************************************************************* + * the setup and teardown functions + * ************************************************************* + */ static int install_sigprof_handler(void) { @@ -218,6 +237,18 @@ return -1; } +static int _write_all(const void *buf, size_t bufsize) +{ + while (bufsize > 0) { + ssize_t count = write(profile_file, buf, bufsize); + if (count <= 0) + return -1; /* failed */ + buf += count; + bufsize -= count; + } + return 0; +} + static int close_profile(void) { int srcfd; @@ -272,5 +303,19 @@ return -1; if (remove_sigprof_handler() == -1) return -1; + shutdown_concurrent_bufs(profile_file); return close_profile(); } + +RPY_EXTERN +void rpython_vmprof_write_buf(char *buf, long size) +{ + struct profbuf_s *p = reserve_buffer(profile_file); + + if (size > SINGLE_BUF_SIZE) + size = SINGLE_BUF_SIZE; + memcpy(p->data, buf, size); + p->data_size = size; + + commit_buffer(profile_file, p); +} diff --git a/rpython/rlib/rvmprof/src/rvmprof.h b/rpython/rlib/rvmprof/src/rvmprof.h --- a/rpython/rlib/rvmprof/src/rvmprof.h +++ b/rpython/rlib/rvmprof/src/rvmprof.h @@ -1,5 +1,6 @@ RPY_EXTERN char *rpython_vmprof_init(void); -RPY_EXTERN void vmprof_ignore_signals(int); +RPY_EXTERN void rpython_vmprof_ignore_signals(int); RPY_EXTERN int rpython_vmprof_enable(int, long); RPY_EXTERN int rpython_vmprof_disable(void); +RPY_EXTERN void rpython_vmprof_write_buf(char *, long); diff --git a/rpython/rlib/rvmprof/src/rvmprof_mt.h b/rpython/rlib/rvmprof/src/rvmprof_mt.h new file mode 100644 --- /dev/null +++ b/rpython/rlib/rvmprof/src/rvmprof_mt.h @@ -0,0 +1,156 @@ +/* Support for multithreaded write() operations */ + +#include <unistd.h> +#include <sys/mman.h> +#include <string.h> + +#define SINGLE_BUF_SIZE (8192 - 2 * sizeof(unsigned int)) +#define MAX_NUM_BUFFERS 32 + +#if defined(__i386__) || defined(__amd64__) + static inline void write_fence(void) { asm("" : : : "memory"); } +#else + static inline void write_fence(void) { __sync_synchronize(); } +#endif + + +#define PROFBUF_UNUSED 0 +#define PROFBUF_FILLING 1 +#define PROFBUF_READY 2 + + +struct profbuf_s { + unsigned int data_size; + unsigned int data_offset; + char data[SINGLE_BUF_SIZE]; +}; + +static char volatile profbuf_state[MAX_NUM_BUFFERS]; +static struct profbuf_s *profbuf_all_buffers = NULL; +static int volatile profbuf_write_lock = 2; + + +static int prepare_concurrent_bufs(void) +{ + assert(sizeof(struct profbuf_s) == 8192); + + if (profbuf_all_buffers != NULL) { + munmap(profbuf_all_buffers, sizeof(struct profbuf_s) * MAX_NUM_BUFFERS); + profbuf_all_buffers = NULL; + } + profbuf_all_buffers = mmap(NULL, sizeof(struct profbuf_s) * MAX_NUM_BUFFERS, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, + -1, 0); + if (profbuf_all_buffers == MAP_FAILED) { + profbuf_all_buffers = NULL; + return -1; + } + memset((char *)profbuf_state, PROFBUF_UNUSED, sizeof(profbuf_state)); + profbuf_write_lock = 0; + return 0; +} + +static void _write_single_ready_buffer(int fd, long i) +{ + struct profbuf_s *p = &profbuf_all_buffers[i]; + ssize_t count = write(fd, p->data + p->data_offset, p->data_size); + if (count == p->data_size) { + profbuf_state[i] = PROFBUF_UNUSED; + } + else if (count > 0) { + p->data_offset += count; + p->data_size -= count; + } +} + +static void _write_ready_buffers(int fd) +{ + long i; + int has_write_lock = 0; + + for (i = 0; i < MAX_NUM_BUFFERS; i++) { + if (profbuf_state[i] == PROFBUF_READY) { + if (!has_write_lock) { + if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 1)) + return; /* can't acquire the write lock, give up */ + has_write_lock = 1; + } + _write_single_ready_buffer(fd, i); + } + } + if (has_write_lock) + profbuf_write_lock = 0; +} + +static struct profbuf_s *reserve_buffer(int fd) +{ + /* Tries to enter a region of code that fills one buffer. If + successful, returns the profbuf_s. It fails only if the + concurrent buffers are all busy (extreme multithreaded usage). + + This might call write() to emit the data sitting in + previously-prepared buffers. In case of write() error, the + error is ignored but unwritten data stays in the buffers. + */ + long i; + + _write_ready_buffers(fd); + + for (i = 0; i < MAX_NUM_BUFFERS; i++) { + if (profbuf_state[i] == PROFBUF_UNUSED && + __sync_bool_compare_and_swap(&profbuf_state[i], PROFBUF_UNUSED, + PROFBUF_FILLING)) { + struct profbuf_s *p = &profbuf_all_buffers[i]; + p->data_size = 0; + p->data_offset = 0; + return p; + } + } + /* no unused buffer found */ + return NULL; +} + +static void commit_buffer(int fd, struct profbuf_s *buf) +{ + /* Leaves a region of code that filled 'buf'. + + This might call write() to emit the data now ready. In case of + write() error, the error is ignored but unwritten data stays in + the buffers. + */ + + /* Make sure every thread sees the full content of 'buf' */ + write_fence(); + + /* Then set the 'ready' flag */ + long i = buf - profbuf_all_buffers; + assert(profbuf_state[i] == PROFBUF_FILLING); + profbuf_state[i] = PROFBUF_READY; + + if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 1)) { + /* can't acquire the write lock, ignore */ + } + else { + _write_single_ready_buffer(fd, i); + profbuf_write_lock = 0; + } +} + +static void shutdown_concurrent_bufs(int fd) +{ + retry: + usleep(1); + if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 2)) { + /* spin loop */ + goto retry; + } + + /* last attempt to flush buffers */ + int i; + for (i = 0; i < MAX_NUM_BUFFERS; i++) { + if (profbuf_state[i] == PROFBUF_READY) { + _write_single_ready_buffer(fd, i); + } + } +} diff --git a/rpython/rlib/rvmprof/src/rvmprof_base.h b/rpython/rlib/rvmprof/src/rvmprof_unwind.h rename from rpython/rlib/rvmprof/src/rvmprof_base.h rename to rpython/rlib/rvmprof/src/rvmprof_unwind.h _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit