Changeset: 2e146af54123 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2e146af54123 Added Files: gdk/shared_memory.c gdk/shared_memory.h sql/backends/monet5/Tests/pyapi10.sql sql/backends/monet5/Tests/pyapi10.stable.err sql/backends/monet5/Tests/pyapi10.stable.out sql/backends/monet5/Tests/pyapi11.sql sql/backends/monet5/Tests/pyapi11.stable.err sql/backends/monet5/Tests/pyapi11.stable.out Modified Files: gdk/Makefile.ag gdk/gdk.h gdk/gdk_heap.c monetdb5/extras/pyapi/Tests/pyapi_types_numeric.malC monetdb5/extras/pyapi/pyapi.c monetdb5/extras/pyapi/pytypes.h monetdb5/optimizer/opt_support.c sql/backends/monet5/Tests/All tools/merovingian/daemon/forkmserver.c tools/merovingian/daemon/merovingian.c tools/merovingian/utils/properties.c Branch: pyapi Log Message:
Reworked multiprocessing to use fork() and shared memory rather than Python Pools. diffs (truncated from 2282 to 300 lines): diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag --- a/gdk/Makefile.ag +++ b/gdk/Makefile.ag @@ -31,12 +31,12 @@ lib_gdk = { gdk_unique.c \ gdk_firstn.c \ bat.feps bat1.feps bat2.feps \ - libbat.rc + libbat.rc shared_memory.h shared_memory.c LIBS = ../common/options/libmoptions \ ../common/stream/libstream \ ../common/utils/libmutils \ $(MATH_LIBS) $(SOCKET_LIBS) $(zlib_LIBS) $(BZ_LIBS) \ - $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) $(KVM_LIBS) + $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) $(KVM_LIBS) } headers_h = { diff --git a/gdk/gdk.h b/gdk/gdk.h --- a/gdk/gdk.h +++ b/gdk/gdk.h @@ -646,6 +646,7 @@ typedef enum { STORE_PRIV = 2, /* BAT copy of copy-on-write mmap */ STORE_CMEM = 3, /* Indicates the value is stored in regular C memory rather than GDK memory.*/ STORE_NOWN = 4, /* Indicates that the bat does not own the chunk of memory and is not in charge of freeing it.*/ + STORE_SHARED = 5, /* Indicattes that the bat uses shared memory. */ STORE_INVALID /* invalid value, used to indicate error */ } storage_t; diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c --- a/gdk/gdk_heap.c +++ b/gdk/gdk_heap.c @@ -47,6 +47,7 @@ #include "monetdb_config.h" #include "gdk.h" #include "gdk_private.h" +#include "shared_memory.h" static void * HEAPcreatefile(int farmid, size_t *maxsz, const char *fn) @@ -560,7 +561,11 @@ HEAPfree(Heap *h, int remove) } else if (h->storage == STORE_CMEM) { //heap is stored in regular C memory rather than GDK memory free(h->base); - } else { /* mapped file, or STORE_PRIV */ + } else if (h->storage == STORE_SHARED) + { + release_shared_memory(h->base); + } + else { /* mapped file, or STORE_PRIV */ gdk_return ret = GDKmunmap(h->base, h->size); if (ret != GDK_SUCCEED) { diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c new file mode 100644 --- /dev/null +++ b/gdk/shared_memory.c @@ -0,0 +1,325 @@ + +#include "shared_memory.h" + +#ifndef WIN32 + +#include <stdlib.h> +#include <assert.h> +#include <string.h> + +#include <sys/types.h> +#include <sys/ipc.h> +#include <sys/shm.h> +#include <sys/wait.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <sched.h> +#include <errno.h> +#include <sys/sem.h> + +#include "monetdb_config.h" +#include "gdk.h" + +static int *shm_memory_ids; +static void **shm_ptrs; +static int shm_unique_id = 1; +static int shm_current_id = 0; +static int shm_max_id = 32; +static bool shm_is_initialized = false; +static char shm_keystring[] = "."; + +void *init_shared_memory(int id, size_t size, int flags); +void store_shared_memory(int memory_id, void *ptr); +bool release_shared_memory_id(int memory_id, void *ptr); + +int init_process_semaphore(int id, int count, int flags); + +void initialize_shared_memory(void) +{ + if (shm_is_initialized) return; + + shm_ptrs = malloc(shm_max_id * sizeof(void*)); + shm_memory_ids = malloc(shm_max_id * sizeof(int)); + shm_current_id = 0; + shm_max_id = 32; + shm_unique_id = 2; + + shm_is_initialized = true; +} + +void store_shared_memory(int memory_id, void *ptr) +{ + int i; + + assert(shm_is_initialized); + + + for(i = 0; i < shm_current_id; i++) + { + if (shm_ptrs[i] == NULL) + { + shm_memory_ids[i] = memory_id; + shm_ptrs[i] = ptr; + return; + } + } + + if (shm_current_id >= shm_max_id) + { + void **new_ptrs = malloc(shm_max_id * 2 * sizeof(void*)); + int *new_memory_ids = malloc(shm_max_id * 2 * sizeof(int)); + + memcpy(new_ptrs, shm_ptrs, sizeof(void*) * shm_max_id); + memcpy(new_memory_ids, shm_memory_ids, sizeof(int) * shm_max_id); + + free(shm_ptrs); free(shm_memory_ids); + shm_ptrs = new_ptrs; shm_memory_ids = new_memory_ids; + shm_max_id *= 2; + } + + + shm_memory_ids[shm_current_id] = memory_id; + shm_ptrs[shm_current_id] = ptr; + shm_current_id++; +} + +int get_unique_shared_memory_id(int offset) +{ + int id; + assert(shm_is_initialized); + + id = shm_unique_id; + shm_unique_id += offset; + return id; +} + +void* create_shared_memory(int id, size_t size) +{ + return init_shared_memory(id, size, IPC_CREAT); +} + +void *get_shared_memory(int id, size_t size) +{ + return init_shared_memory(id, size, 0); +} + +void *init_shared_memory(int id, size_t size, int flags) +{ + int shmid; + void *ptr; + int i; + int key = ftok(shm_keystring, id); + if (key == (key_t) -1) + { + perror("ftok"); + return NULL; + } + + assert(shm_is_initialized); + + shmid = shmget(key, size, flags | 0666); + if (shmid < 0) + { + perror("shmget"); + return NULL; + } + + //check if the shared memory segment is already created, if it is we do not need to add it to the table and can simply return the pointer + for(i = 0; i < shm_current_id; i++) + { + if (shm_memory_ids[i] == shmid) + { + return shm_ptrs[i]; + } + } + + ptr = shmat(shmid, NULL, 0); + if (ptr == (void*)-1) + { + perror("shmat"); + return NULL; + } + + store_shared_memory(shmid, ptr); + return ptr; +} + +bool release_shared_memory(void *ptr) +{ + int i = 0; + int memory_id = -1; + + assert(shm_is_initialized); + + //find the memory_id accompanying the given pointer in the structure + for(i = 0; i < shm_current_id; i++) + { + if (shm_ptrs[i] == ptr) + { + memory_id = shm_memory_ids[i]; + shm_memory_ids[i] = 0; + shm_ptrs[i] = NULL; + break; + } + } + + assert(memory_id); + //actually release the memory at the given ID + return release_shared_memory_id(memory_id, ptr); +} + +bool release_shared_memory_id(int memory_id, void *ptr) +{ + if (shmctl(memory_id, IPC_RMID, NULL) == -1) + { + perror("shmctl"); + return false; + } + if (shmdt(ptr) == -1) + { + perror("shmdt"); + return false; + } + return true; +} + +int init_process_semaphore(int id, int count, int flags) +{ + int key = ftok(shm_keystring, id); + int semid = -1; + if (key == (key_t) -1) + { + perror("ftok"); + return semid; + } + semid = semget(key, count, flags | 0666); + if (semid < 0) + { + perror("semget failed: "); + } + return semid; +} + +int create_process_semaphore(int id, int count) +{ + return init_process_semaphore(id, count, IPC_CREAT); +} + +int get_process_semaphore(int sem_id, int count) +{ + return init_process_semaphore(sem_id, count, 0); +} + +int get_semaphore_value(int sem_id, int number) +{ + int semval = semctl(sem_id, number, GETVAL, 0); + if (semval < 0) + { + perror("semctl failed: "); + } + return semval; +} + +bool change_semaphore_value(int sem_id, int number, int change) +{ + struct sembuf buffer; + buffer.sem_num = number; + buffer.sem_op = change; + buffer.sem_flg = 0; + + if (semop(sem_id, &buffer, 1) < 0) + { + perror("semop failed: "); + return false; + } + return true; +} + +bool release_process_semaphore(int sem_id) +{ + if (semctl(sem_id, 0, IPC_RMID) < 0) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list