Changeset: 0561eaacb72e for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0561eaacb72e
Added Files:
        sql/backends/monet5/Tests/pyapi19.sql
        sql/backends/monet5/Tests/pyapi19.stable.err
        sql/backends/monet5/Tests/pyapi19.stable.out
Modified Files:
        gdk/shared_memory.c
        gdk/shared_memory.h
        monetdb5/extras/pyapi/connection.c
        monetdb5/extras/pyapi/connection.h
        monetdb5/extras/pyapi/lazyarray.c
        monetdb5/extras/pyapi/pyapi.c
        monetdb5/extras/pyapi/pytypes.h
        sql/backends/monet5/Tests/All
        sql/backends/monet5/Tests/pyapi25.sql
        sql/backends/monet5/Tests/pyapi25.stable.err
        sql/backends/monet5/Tests/pyapi25.stable.out
        tools/embeddedpy/embeddedpy.c
Branch: pyapi
Log Message:

When a loopback query occurs in a forked process, send the query back to the 
main mserver and execute the query there, then ship the result back using 
shared memory or mmap. Because all queries are executed in the main process 
this resolves all locking issues, and we can now perform any type of query.


diffs (truncated from 1962 to 300 lines):

diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -23,6 +23,7 @@
 #include <sched.h>
 #include <errno.h>
 #include <sys/sem.h>
+#include <time.h>
 
 static lng *shm_memory_ids;
 static void **shm_ptrs;
@@ -33,14 +34,22 @@ static int shm_is_initialized = false;
 static MT_Lock release_memory_lock;
 static key_t base_key = 800000000;
 
+#define SHM_SHARED 1
+#define SHM_MEMMAP 2
+#define SHM_EITHER 3
 
-str init_shared_memory(int id, size_t size, void **ptr, int flags);
+static int memtype = SHM_SHARED;
+
+
+str init_shared_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid);
 void store_shared_memory(lng memory_id, void *ptr);
 str release_shared_memory_id(int memory_id, void *ptr);
 
-str init_mmap_memory(int id, size_t size, void **ptr, int flags);
+str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid);
 str release_mmap_memory(void *ptr, size_t size);
 
+str init_process_semaphore(int id, int count, int flags, int *semid);
+
 str initialize_shared_memory(void)
 {
        if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as 
well
@@ -104,11 +113,13 @@ int get_unique_shared_memory_id(int offs
        return id;
 }
 
-str init_mmap_memory(int id, size_t size, void **return_ptr, int flags)
+str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid)
 {   
     char address[100];
     void *ptr;
     int fd, result;
+    // TODO: memmap shouldn't be in tmp directory
+    // TODO: we should just use GDKmmap, try to get that to work
     snprintf(address, 100, "/tmp/temp_pyapi_mmap_%d", id);
 
     fd = open(address, flags | O_RDWR, MONETDB_MODE);
@@ -141,14 +152,16 @@ str init_mmap_memory(int id, size_t size
         errno = 0;
         return createException(MAL, "shared_memory.get", "Failure in 
mmap(NULL, %zu, PROT_WRITE, MAP_SHARED, %d, 0): %s", size, fd, err);
     }
-    store_shared_memory(size, ptr);
+    if (reg) store_shared_memory(size, ptr);
     if (return_ptr != NULL) *return_ptr = ptr;
+    if (return_shmid != NULL) *return_shmid = id;
     return MAL_SUCCEED;
 }
 
 str release_mmap_memory(void *ptr, size_t size)
 {
     int ret;
+    // TODO: Actually delete files on disk
     ret = munmap(ptr, size);
     if (ret != 0) {
         char *err = strerror(errno);
@@ -158,19 +171,46 @@ str release_mmap_memory(void *ptr, size_
     return MAL_SUCCEED;
 }
 
-str create_shared_memory(int id, size_t size, void **return_ptr)
+str release_shared_memory_ptr(void *ptr)
+{
+    if (shmdt(ptr) == -1)
+    {
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.release", "Error calling 
shmdt(ptr:%p): %s", ptr, err);
+    }
+    return MAL_SUCCEED;
+}
+
+str create_shared_memory(int id, size_t size, bool reg, void **return_ptr, lng 
*return_shmid)
 {
     char *shared, *mmap;
-    if ((shared = init_shared_memory(id, size, return_ptr, IPC_CREAT)) == 
MAL_SUCCEED) return MAL_SUCCEED;
-    if ((mmap = init_mmap_memory(id, size, return_ptr, O_CREAT)) == 
MAL_SUCCEED) return MAL_SUCCEED;
+    if (memtype == SHM_SHARED)
+    {
+        return init_shared_memory(id, size, return_ptr, IPC_CREAT, reg, 
return_shmid);
+    }
+    if (memtype == SHM_MEMMAP)
+    {
+        return init_mmap_memory(id, size, return_ptr, O_CREAT, reg, 
return_shmid);
+    }
+    if ((shared = init_shared_memory(id, size, return_ptr, IPC_CREAT, reg, 
return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED;
+    if ((mmap = init_mmap_memory(id, size, return_ptr, O_CREAT, reg, 
return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED;
     return createException(MAL, "shared_memory.release_mmap_memory", "Failed 
to create shared memory or mmap space.\nshared memory error: %s\nmmap error: 
%s", shared, mmap);
 }
 
-str get_shared_memory(int id, size_t size, void **return_ptr)
+str get_shared_memory(int id, size_t size, bool reg, void **return_ptr, lng 
*return_shmid)
 {
     char *shared, *mmap;
-    if ((shared = init_shared_memory(id, size, return_ptr, 0)) == MAL_SUCCEED) 
return MAL_SUCCEED;
-    if ((mmap = init_mmap_memory(id, size, return_ptr, 0)) == MAL_SUCCEED) 
return MAL_SUCCEED;
+    if (memtype == SHM_SHARED)
+    {
+        return init_shared_memory(id, size, return_ptr, 0, reg, return_shmid);
+    }
+    if (memtype == SHM_MEMMAP)
+    {
+        return init_mmap_memory(id, size, return_ptr, 0, reg, return_shmid);
+    }
+    if ((shared = init_shared_memory(id, size, return_ptr, 0, reg, 
return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED;
+    if ((mmap = init_mmap_memory(id, size, return_ptr, 0, reg, return_shmid)) 
== MAL_SUCCEED) return MAL_SUCCEED;
     return createException(MAL, "shared_memory.release_mmap_memory", "Failed 
to get shared memory or mmap space.\nshared memory error: %s\nmmap error: %s", 
shared, mmap);
 }
 
@@ -181,7 +221,7 @@ str ftok_enhanced(int id, key_t *return_
     return MAL_SUCCEED;
 }
 
-str init_shared_memory(int id, size_t size, void **return_ptr, int flags)
+str init_shared_memory(int id, size_t size, void **return_ptr, int flags, bool 
reg, lng *return_shmid)
 {
     lng shmid;
     void *ptr;
@@ -205,16 +245,19 @@ str init_shared_memory(int id, size_t si
         return createException(MAL, "shared_memory.get", "Error calling 
shmget(key:%zu,size:%zu,flags:%d): %s", (size_t)key, size, flags, err);
     }
 
-    //check if the shared memory segment is already created, if it is we do 
not need to add it to the table and can simply return the pointer
-    for(i = 0; i < shm_current_id; i++)
-    {
-        if (shm_memory_ids[i] == shmid)
+    if (reg) {
+        //check if the shared memory segment is already created, if it is we 
do not need to add it to the table and can simply return the pointer
+        for(i = 0; i < shm_current_id; i++)
         {
-            if (return_ptr != NULL) *return_ptr = shm_ptrs[i];
-            return MAL_SUCCEED;
+            if (shm_memory_ids[i] == shmid)
+            {
+                if (return_ptr != NULL) *return_ptr = shm_ptrs[i];
+                return MAL_SUCCEED;
+            }
         }
     }
 
+
        ptr = shmat(shmid, NULL, 0);
     if (ptr == (void*)-1)
     {
@@ -223,7 +266,8 @@ str init_shared_memory(int id, size_t si
         return createException(MAL, "shared_memory.get", "Error calling 
shmat(id:%lld,NULL,0): %s", shmid, err);
     }
 
-    store_shared_memory(shmid, ptr);
+    if (reg) store_shared_memory(shmid, ptr);
+    if (return_shmid != NULL) *return_shmid = shmid;
     if (return_ptr != NULL) *return_ptr = ptr;
     return MAL_SUCCEED;
 }
@@ -248,30 +292,138 @@ str release_shared_memory(void *ptr)
                }
        }
     MT_lock_unset(&release_memory_lock, "release_memory_lock");
+    return release_shared_memory_shmid(memory_id, ptr);
+}
 
-       assert(memory_id);
-       //actually release the memory at the given ID
-       if (release_shared_memory_id(memory_id, ptr) == MAL_SUCCEED) return 
MAL_SUCCEED;
+str release_shared_memory_shmid(int memory_id, void *ptr)
+{
+    assert(memory_id);
+
+    if (memtype == SHM_SHARED)
+    {
+        return release_shared_memory_id(memory_id, ptr);
+    }
+    if (memtype == SHM_MEMMAP)
+    {
+        return release_mmap_memory(ptr, memory_id);
+    }
+    if (release_shared_memory_id(memory_id, ptr) == MAL_SUCCEED) return 
MAL_SUCCEED;
     if (release_mmap_memory(ptr, memory_id) == MAL_SUCCEED) return MAL_SUCCEED;
     return createException(MAL, "shared_memory.release", "Failed to release 
shared memory.");
 }
 
 str release_shared_memory_id(int memory_id, void *ptr)
-{
+{   
+    if (shmdt(ptr) == -1)
+    {
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.release", "Error calling 
shmdt(ptr:%p): %s", ptr, err);
+    }
        if (shmctl(memory_id, IPC_RMID, NULL) == -1)
        {
         char *err = strerror(errno);
         errno = 0;
         return createException(MAL, "shared_memory.release", "Error calling 
shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err);
        }
-       if (shmdt(ptr) == -1)
-       {
+       return MAL_SUCCEED;
+}
+
+str init_process_semaphore(int id, int count, int flags, int *semid)
+{
+    str msg = MAL_SUCCEED;
+    int key;
+    msg = ftok_enhanced(id, &key);
+    if (msg != MAL_SUCCEED) {
+        return msg;
+    }
+    *semid = semget(key, count, flags | 0666);
+    if (*semid < 0) {
         char *err = strerror(errno);
         errno = 0;
-        return createException(MAL, "shared_memory.release", "Error calling 
shmdt(ptr:%p): %s", ptr, err);
-       }
-       return MAL_SUCCEED;
+        return createException(MAL, "semaphore.init", "Error calling 
semget(key:%d,nsems:%d,semflg:%d): %s", key, count, flags | 0666, err);
+    }
+    return msg;
 }
+
+str create_process_semaphore(int id, int count, int *semid)
+{
+    return init_process_semaphore(id, count, IPC_CREAT, semid);
+}
+
+str get_process_semaphore(int sem_id, int count, int *semid)
+{
+    return init_process_semaphore(sem_id, count, 0, semid);
+}
+
+str get_semaphore_value(int sem_id, int number, int *semval)
+{
+    *semval = semctl(sem_id, number, GETVAL, 0);
+    if (*semval < 0)
+    {
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "semaphore.init", "Error calling 
semctl(semid:%d,semnum:%d,cmd:%d,param:0): %s", sem_id, number, GETVAL, err);
+    }
+    return MAL_SUCCEED;
+}
+
+str change_semaphore_value(int sem_id, int number, int change)
+{
+    str msg = MAL_SUCCEED;
+    struct sembuf buffer;
+    buffer.sem_num = number;
+    buffer.sem_op = change;
+    buffer.sem_flg = 0;
+
+    if (semop(sem_id, &buffer, 1) < 0)
+    {
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "semaphore.init", "Error calling 
semop(semid:%d, sops: { sem_num:%d, sem_op:%d, sem_flag: %d }, nsops:1): %s", 
sem_id, number, change, 0, err);
+    }
+    return msg;
+}
+
+str change_semaphore_value_timeout(int sem_id, int number, int change, int 
timeout_mseconds, bool *succeed)
+{
+    str msg = MAL_SUCCEED;
+    struct timespec timeout;
+    struct sembuf buffer;
+    buffer.sem_num = number;
+    buffer.sem_op = change;
+    buffer.sem_flg = 0;
+    *succeed = false;
+
+    timeout.tv_sec = (timeout_mseconds / 1000);
+    timeout.tv_nsec = (timeout_mseconds % 1000) * 1000;
+
+    if (semtimedop(sem_id, &buffer, 1, &timeout) < 0)
+    {
+        if (errno == EAGAIN) {
+            errno = 0;
+            return MAL_SUCCEED;
+        } else {
+            char *err = strerror(errno);
+            errno = 0;
+            return createException(MAL, "semaphore.init", "Error calling 
semtimedop(semid:%d, sops: { sem_num:%d, sem_op:%d, sem_flag: %d }, nsops:1): 
%s", sem_id, number, change, 0, err);
+        }
+    }
+    *succeed = true;
+    return msg;
+}
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to