Changeset: 2e146af54123 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2e146af54123
Added Files:
        gdk/shared_memory.c
        gdk/shared_memory.h
        sql/backends/monet5/Tests/pyapi10.sql
        sql/backends/monet5/Tests/pyapi10.stable.err
        sql/backends/monet5/Tests/pyapi10.stable.out
        sql/backends/monet5/Tests/pyapi11.sql
        sql/backends/monet5/Tests/pyapi11.stable.err
        sql/backends/monet5/Tests/pyapi11.stable.out
Modified Files:
        gdk/Makefile.ag
        gdk/gdk.h
        gdk/gdk_heap.c
        monetdb5/extras/pyapi/Tests/pyapi_types_numeric.malC
        monetdb5/extras/pyapi/pyapi.c
        monetdb5/extras/pyapi/pytypes.h
        monetdb5/optimizer/opt_support.c
        sql/backends/monet5/Tests/All
        tools/merovingian/daemon/forkmserver.c
        tools/merovingian/daemon/merovingian.c
        tools/merovingian/utils/properties.c
Branch: pyapi
Log Message:

Reworked multiprocessing to use fork() and shared memory rather than Python 
Pools.


diffs (truncated from 2282 to 300 lines):

diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
--- a/gdk/Makefile.ag
+++ b/gdk/Makefile.ag
@@ -31,12 +31,12 @@ lib_gdk = {
                gdk_unique.c \
                gdk_firstn.c \
                bat.feps bat1.feps bat2.feps \
-               libbat.rc
+               libbat.rc shared_memory.h shared_memory.c
        LIBS = ../common/options/libmoptions \
                ../common/stream/libstream \
                ../common/utils/libmutils \
                $(MATH_LIBS) $(SOCKET_LIBS) $(zlib_LIBS) $(BZ_LIBS) \
-               $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) 
$(KVM_LIBS)
+               $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) 
$(KVM_LIBS) 
 }
 
 headers_h = {
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -646,6 +646,7 @@ typedef enum {
        STORE_PRIV = 2,         /* BAT copy of copy-on-write mmap */
        STORE_CMEM = 3,     /* Indicates the value is stored in regular C 
memory rather than GDK memory.*/
        STORE_NOWN = 4,     /* Indicates that the bat does not own the chunk of 
memory and is not in charge of freeing it.*/
+       STORE_SHARED = 5,   /* Indicattes that the bat uses shared memory. */
        STORE_INVALID           /* invalid value, used to indicate error */
 } storage_t;
 
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -47,6 +47,7 @@
 #include "monetdb_config.h"
 #include "gdk.h"
 #include "gdk_private.h"
+#include "shared_memory.h"
 
 static void *
 HEAPcreatefile(int farmid, size_t *maxsz, const char *fn)
@@ -560,7 +561,11 @@ HEAPfree(Heap *h, int remove)
                } else if (h->storage == STORE_CMEM) {
                        //heap is stored in regular C memory rather than GDK 
memory
                        free(h->base);
-               } else {        /* mapped file, or STORE_PRIV */
+               } else if (h->storage == STORE_SHARED)
+               {
+                       release_shared_memory(h->base);
+               }
+               else {  /* mapped file, or STORE_PRIV */
                        gdk_return ret = GDKmunmap(h->base, h->size);
 
                        if (ret != GDK_SUCCEED) {
diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
new file mode 100644
--- /dev/null
+++ b/gdk/shared_memory.c
@@ -0,0 +1,325 @@
+
+#include "shared_memory.h"
+
+#ifndef WIN32
+
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include <sys/types.h>
+#include <sys/ipc.h> 
+#include <sys/shm.h> 
+#include <sys/wait.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h> 
+#include <sched.h>
+#include <errno.h>
+#include <sys/sem.h>
+
+#include "monetdb_config.h"
+#include "gdk.h"
+
+static int *shm_memory_ids;
+static void **shm_ptrs;
+static int shm_unique_id = 1;
+static int shm_current_id = 0;
+static int shm_max_id = 32;
+static bool shm_is_initialized = false;
+static char shm_keystring[] = ".";
+
+void *init_shared_memory(int id, size_t size, int flags);
+void store_shared_memory(int memory_id, void *ptr);
+bool release_shared_memory_id(int memory_id, void *ptr);
+
+int init_process_semaphore(int id, int count, int flags);
+
+void initialize_shared_memory(void)
+{
+       if (shm_is_initialized) return;
+
+       shm_ptrs = malloc(shm_max_id * sizeof(void*));
+       shm_memory_ids = malloc(shm_max_id * sizeof(int));
+       shm_current_id = 0;
+       shm_max_id = 32;
+       shm_unique_id = 2;
+
+       shm_is_initialized = true;
+}
+
+void store_shared_memory(int memory_id, void *ptr)
+{
+    int i;
+
+    assert(shm_is_initialized);
+
+
+    for(i = 0; i < shm_current_id; i++)
+    {
+        if (shm_ptrs[i] == NULL)
+        {
+            shm_memory_ids[i] = memory_id;
+            shm_ptrs[i] = ptr;
+            return;
+        }
+    }
+
+       if (shm_current_id >= shm_max_id)
+       {
+               void **new_ptrs = malloc(shm_max_id * 2 * sizeof(void*));
+               int *new_memory_ids = malloc(shm_max_id * 2 * sizeof(int));
+
+               memcpy(new_ptrs, shm_ptrs, sizeof(void*) * shm_max_id);
+               memcpy(new_memory_ids, shm_memory_ids, sizeof(int) * 
shm_max_id);
+
+               free(shm_ptrs); free(shm_memory_ids);
+               shm_ptrs = new_ptrs; shm_memory_ids = new_memory_ids;
+               shm_max_id *= 2;
+       }
+
+
+       shm_memory_ids[shm_current_id] = memory_id;
+       shm_ptrs[shm_current_id] = ptr;
+       shm_current_id++;
+}
+
+int get_unique_shared_memory_id(int offset)
+{
+    int id;
+    assert(shm_is_initialized);
+
+       id = shm_unique_id;
+       shm_unique_id += offset;
+       return id;
+}
+
+void* create_shared_memory(int id, size_t size)
+{
+       return init_shared_memory(id, size, IPC_CREAT);
+}
+
+void *get_shared_memory(int id, size_t size)
+{
+       return init_shared_memory(id, size, 0);
+}
+
+void *init_shared_memory(int id, size_t size, int flags)
+{
+    int shmid;
+    void *ptr;
+    int i;
+       int key = ftok(shm_keystring, id);
+    if (key == (key_t) -1)
+    {
+        perror("ftok");
+        return NULL;
+    }
+
+       assert(shm_is_initialized);
+
+       shmid = shmget(key, size, flags | 0666);
+    if (shmid < 0)
+    {
+       perror("shmget");
+        return NULL;
+    }
+
+    //check if the shared memory segment is already created, if it is we do 
not need to add it to the table and can simply return the pointer
+    for(i = 0; i < shm_current_id; i++)
+    {
+        if (shm_memory_ids[i] == shmid)
+        {
+            return shm_ptrs[i];
+        }
+    }
+
+       ptr = shmat(shmid, NULL, 0);
+    if (ptr == (void*)-1)
+    {
+       perror("shmat");
+        return NULL;
+    }
+
+    store_shared_memory(shmid, ptr);
+    return ptr;
+}
+
+bool release_shared_memory(void *ptr)
+{
+       int i = 0;
+       int memory_id = -1;
+
+    assert(shm_is_initialized);
+
+       //find the memory_id accompanying the given pointer in the structure
+       for(i = 0; i < shm_current_id; i++)
+       {
+               if (shm_ptrs[i] == ptr)
+               {
+            memory_id = shm_memory_ids[i];
+            shm_memory_ids[i] = 0;
+            shm_ptrs[i] = NULL;
+                       break;
+               }
+       }
+
+       assert(memory_id);
+       //actually release the memory at the given ID
+       return release_shared_memory_id(memory_id, ptr);
+}
+
+bool release_shared_memory_id(int memory_id, void *ptr)
+{
+       if (shmctl(memory_id, IPC_RMID, NULL) == -1)
+       {
+       perror("shmctl");
+        return false;
+       }
+       if (shmdt(ptr) == -1)
+       {
+       perror("shmdt");
+        return false;
+       }
+       return true;
+}
+
+int init_process_semaphore(int id, int count, int flags)
+{
+    int key = ftok(shm_keystring, id);
+    int semid = -1;
+    if (key == (key_t) -1)
+    {
+        perror("ftok");
+        return semid;
+    }
+    semid = semget(key, count, flags | 0666);
+    if (semid < 0)
+    {
+        perror("semget failed: ");
+    }
+    return semid;
+}
+
+int create_process_semaphore(int id, int count)
+{
+    return init_process_semaphore(id, count, IPC_CREAT);
+}
+
+int get_process_semaphore(int sem_id, int count)
+{
+    return init_process_semaphore(sem_id, count, 0);
+}
+
+int get_semaphore_value(int sem_id, int number)
+{
+    int semval = semctl(sem_id, number, GETVAL, 0);
+    if (semval < 0)
+    {
+        perror("semctl failed: ");
+    }
+    return semval;
+}
+
+bool change_semaphore_value(int sem_id, int number, int change)
+{
+    struct sembuf buffer;
+    buffer.sem_num = number;
+    buffer.sem_op = change;
+    buffer.sem_flg = 0;
+
+    if (semop(sem_id, &buffer, 1) < 0)
+    {
+        perror("semop failed: ");
+        return false;
+    }
+    return true;
+}
+
+bool release_process_semaphore(int sem_id)
+{
+    if (semctl(sem_id, 0, IPC_RMID) < 0)
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to