Changeset: ae06ee4938d6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae06ee4938d6
Modified Files:
        monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:

Added support for varres in PyAPIeval (but not yet available on SQL level).


diffs (219 lines):

diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -423,6 +423,8 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
 #endif
     PyGILState_STATE gstate = 0;
     int j;
+    bit varres = sqlfun ? sqlfun->varres : 0;
+    int retcols = !varres ? pci->retc : -1;
     size_t iu;
 
     (void) cntxt;
@@ -900,23 +902,43 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
             // For dictionary returns we need to map each of the (key,value) 
pairs to the proper return value
             // We first analyze the SQL Function structure for a list of 
return value names
             char **retnames = NULL;
-            if (sqlfun != NULL) {
-                retnames = GDKzalloc(sizeof(char*) * sqlfun->res->cnt);
-                argnode = sqlfun->res->h;
-                for(i = 0; i < sqlfun->res->cnt; i++) {
-                    retnames[i] = ((sql_arg*)argnode->data)->name;
-                    argnode = argnode->next;
+            if (!varres)
+            {
+                if (sqlfun != NULL) {
+                    retnames = GDKzalloc(sizeof(char*) * sqlfun->res->cnt);
+                    argnode = sqlfun->res->h;
+                    for(i = 0; i < sqlfun->res->cnt; i++) {
+                        retnames[i] = ((sql_arg*)argnode->data)->name;
+                        argnode = argnode->next;
+                    }
+                } else {
+                    msg = createException(MAL, "pyapi.eval", "Return value is 
a dictionary, but there is no sql function object, so we don't know the return 
value names and mapping cannot be done.");
+                    goto wrapup;
                 }
             } else {
-                msg = createException(MAL, "pyapi.eval", "Return value is a 
dictionary, but there is no sql function object, so we don't know the return 
value names and mapping cannot be done.");
-                goto wrapup;
+                // If there are a variable number of return types, we take the 
column names from the dictionary
+                PyObject *keys = PyDict_Keys(pResult);
+                retcols = PyList_Size(keys);
+                retnames = GDKzalloc(sizeof(char*) * retcols);
+                for(i = 0; i < retcols; i++) {
+                    PyObject *colname = PyList_GetItem(keys, i);
+                    if (!PyString_CheckExact(colname)) {
+                        msg = createException(MAL, "pyapi.eval", "Expected a 
string key in the dictionary, but received an object of type %s", 
colname->ob_type->tp_name);
+                        goto wrapup;
+                    }
+                    retnames[i] = ((PyStringObject*)colname)->ob_sval;
+                }
             }
-            pResult = PyDict_CheckForConversion(pResult, pci->retc, retnames, 
&msg);
+            pResult = PyDict_CheckForConversion(pResult, retcols, retnames, 
&msg);
             if (retnames != NULL) GDKfree(retnames);
-        } else {
+        } else if (varres) {
+            msg = createException(MAL, "pyapi.eval", "Expected a variable 
number return values, but the return type was not a dictionary. We require the 
return type to be a dictionary for column naming purposes.");
+            goto wrapup;
+        }
+        else {
             // Now we need to do some error checking on the result object, 
because the result object has to have the correct type/size
             // We will also do some converting of result objects to a common 
type (such as scalar -> [[scalar]])
-            pResult = PyObject_CheckForConversion(pResult, pci->retc, NULL, 
&msg);
+            pResult = PyObject_CheckForConversion(pResult, retcols, NULL, 
&msg);
         }
         if (pResult == NULL) {
             goto wrapup;
@@ -924,7 +946,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
     }
     VERBOSE_MESSAGE("Collecting return values.\n");
 
-
+    if (varres) {
+        GDKfree(pyreturn_values);
+        pyreturn_values = GDKzalloc(retcols * sizeof(PyReturn));
+    }
 
     // Now we have executed the Python function, we have to collect the return 
values and convert them to BATs
     // We will first collect header information about the Python return 
objects and extract the underlying C arrays
@@ -933,7 +958,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
     // The reason we are doing this as a separate step is because this 
preprocessing requires us to call the Python API
     // Whereas the actual returning does not require us to call the Python API
     // This means we can do the actual returning without holding the GIL
-    if (!PyObject_PreprocessObject(pResult, pyreturn_values, pci->retc, &msg)) 
{
+    if (!PyObject_PreprocessObject(pResult, pyreturn_values, retcols, &msg)) {
         goto wrapup;
     }
 
@@ -963,10 +988,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
 #endif
         // Now we will write data about our result (memory size, type, number 
of elements) to the header
         ptr = (ReturnBatDescr*)shm_ptr;
-        for (i = 0; i < pci->retc; i++)
+        for (i = 0; i < retcols; i++)
         {
             PyReturn *ret = &pyreturn_values[i];
-            ReturnBatDescr *descr = &ptr[(process_id - 1) * pci->retc + i];
+            ReturnBatDescr *descr = &ptr[(process_id - 1) * retcols + i];
 
             if (ret->result_type == NPY_OBJECT) {
                 // We can't deal with NPY_OBJECT arrays, because these are 
'arrays of pointers', so we can't just copy the content of the array into 
shared memory
@@ -1020,7 +1045,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
         {
             // So if we get here, we know all processes have finished and that 
we are the last process to pass the first semaphore
             // Since we are the last process, it is our job to create the 
shared memory for each of the return values
-            for (i = 0; i < pci->retc; i++)
+            for (i = 0; i < retcols; i++)
             {
                 size_t return_size = 0;
                 size_t mask_size = 0;
@@ -1028,7 +1053,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
                 // Now we will count the size of the return values for each of 
the processes
                 for(j = 0; j < process_count; j++)
                 {
-                     ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * 
pci->retc + i]);
+                     ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * 
retcols + i]);
                      return_size += descr->bat_size;
                      mask_size += descr->bat_count * sizeof(bool);
                      has_mask = has_mask || descr->has_mask;
@@ -1044,7 +1069,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
                 if (has_mask)
                 {
                     assert(mask_size > 0);
-                    if (create_shared_memory(shm_id + pci->retc + (i + 1), 
mask_size, NULL) != MAL_SUCCEED) //create a memory space for the mask
+                    if (create_shared_memory(shm_id + retcols + (i + 1), 
mask_size, NULL) != MAL_SUCCEED) //create a memory space for the mask
                     {
                         msg = createException(MAL, "pyapi.eval", "Failed to 
allocate shared memory for returning mask.\n");
                         goto wrapup;
@@ -1067,11 +1092,11 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
             // However, we do not know if any of the other childs have failed
             // If they have, they have written descr->npy_type to -1 in one of 
their headers
             // So we check for that
-            for (i = 0; i < pci->retc; i++)
+            for (i = 0; i < retcols; i++)
             {
                 for(j = 0; j < process_count; j++)
                 {
-                    ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * 
pci->retc + i]);
+                    ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * 
retcols + i]);
                     if (descr->npy_type < 0)
                     {
                         // If any of the child processes have failed, exit 
without an error code because we did not fail
@@ -1083,7 +1108,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
         }
 
         // Now we can finally return the values
-        for (i = 0; i < pci->retc; i++)
+        for (i = 0; i < retcols; i++)
         {
             char *mem_ptr;
             PyReturn *ret = &pyreturn_values[i];
@@ -1095,7 +1120,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
             bool has_mask = false;
             for(j = 0; j < process_count; j++)
             {
-                ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * pci->retc 
+ i]);
+                ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * retcols + 
i]);
                 if (j < (process_id - 1))
                 {
                    start_size += descr->bat_size;
@@ -1118,7 +1143,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
 
             if (has_mask) {
                 bool *mask_ptr;
-                msg = get_shared_memory(shm_id + pci->retc + (i + 1), 
mask_size, (void**) &mask_ptr);
+                msg = get_shared_memory(shm_id + retcols + (i + 1), mask_size, 
(void**) &mask_ptr);
                 // If any of the processes return a mask, we need to write our 
mask values to the shared memory
 
                 if (mask_ptr == NULL) {
@@ -1151,17 +1176,23 @@ returnvalues:
     /*[RETURN_VALUES]*/
     VERBOSE_MESSAGE("Returning values.\n");
 
-    for (i = 0; i < pci->retc; i++)
+    for (i = 0; i < retcols; i++)
     {
         PyReturn *ret = &pyreturn_values[i];
-        int bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i)));
+        int bat_type = TYPE_any;
+        if (!varres) {
+            bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i)));
 
-        if (bat_type == TYPE_any || bat_type == TYPE_void) {
-            getArgType(mb,pci,i) = bat_type;
-            msg = createException(MAL, "pyapi.eval", "Unknown return value, 
possibly projecting with no parameters.");
-            goto wrapup;
+            if (bat_type == TYPE_any || bat_type == TYPE_void) {
+                getArgType(mb,pci,i) = bat_type;
+                msg = createException(MAL, "pyapi.eval", "Unknown return 
value, possibly projecting with no parameters.");
+                goto wrapup;
+            }
+        } else {
+            bat_type = PyType_ToBat(ret->result_type);
         }
 
+
         b = PyObject_ConvertToBAT(ret, bat_type, i, seqbase, &msg);
         if (b == NULL) {
             goto wrapup;
@@ -1200,8 +1231,8 @@ returnvalues:
 
         // To indicate that we failed, we will write information to our header
         ptr = (ReturnBatDescr*)shm_ptr;
-        for (i = 0; i < pci->retc; i++) {
-            ReturnBatDescr *descr = &ptr[(process_id - 1) * pci->retc + i];
+        for (i = 0; i < retcols; i++) {
+            ReturnBatDescr *descr = &ptr[(process_id - 1) * retcols + i];
             // We will write descr->npy_type to -1, so other processes can see 
that we failed
             descr->npy_type = -1;
             // We will write the memory size of our error message to the 
bat_size, so the main process can access the shared memory
@@ -1245,7 +1276,7 @@ returnvalues:
         //thus we need to free python objects, thus we need to obtain the GIL
         gstate = PyGILState_Ensure();
     }
-    for (i = 0; i < pci->retc; i++) {
+    for (i = 0; i < retcols; i++) {
         PyReturn *ret = &pyreturn_values[i];
         // First clean up any return values
         if (!ret->multidimensional) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to