Changeset: ae06ee4938d6 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae06ee4938d6 Modified Files: monetdb5/extras/pyapi/pyapi.c Branch: pyapi Log Message:
Added support for varres in PyAPIeval (but not yet available on SQL level). diffs (219 lines): diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c --- a/monetdb5/extras/pyapi/pyapi.c +++ b/monetdb5/extras/pyapi/pyapi.c @@ -423,6 +423,8 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb #endif PyGILState_STATE gstate = 0; int j; + bit varres = sqlfun ? sqlfun->varres : 0; + int retcols = !varres ? pci->retc : -1; size_t iu; (void) cntxt; @@ -900,23 +902,43 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb // For dictionary returns we need to map each of the (key,value) pairs to the proper return value // We first analyze the SQL Function structure for a list of return value names char **retnames = NULL; - if (sqlfun != NULL) { - retnames = GDKzalloc(sizeof(char*) * sqlfun->res->cnt); - argnode = sqlfun->res->h; - for(i = 0; i < sqlfun->res->cnt; i++) { - retnames[i] = ((sql_arg*)argnode->data)->name; - argnode = argnode->next; + if (!varres) + { + if (sqlfun != NULL) { + retnames = GDKzalloc(sizeof(char*) * sqlfun->res->cnt); + argnode = sqlfun->res->h; + for(i = 0; i < sqlfun->res->cnt; i++) { + retnames[i] = ((sql_arg*)argnode->data)->name; + argnode = argnode->next; + } + } else { + msg = createException(MAL, "pyapi.eval", "Return value is a dictionary, but there is no sql function object, so we don't know the return value names and mapping cannot be done."); + goto wrapup; } } else { - msg = createException(MAL, "pyapi.eval", "Return value is a dictionary, but there is no sql function object, so we don't know the return value names and mapping cannot be done."); - goto wrapup; + // If there are a variable number of return types, we take the column names from the dictionary + PyObject *keys = PyDict_Keys(pResult); + retcols = PyList_Size(keys); + retnames = GDKzalloc(sizeof(char*) * retcols); + for(i = 0; i < retcols; i++) { + PyObject *colname = PyList_GetItem(keys, i); + if (!PyString_CheckExact(colname)) { + msg = createException(MAL, "pyapi.eval", "Expected a string key in the dictionary, but received an object of type %s", colname->ob_type->tp_name); + goto wrapup; + } + retnames[i] = ((PyStringObject*)colname)->ob_sval; + } } - pResult = PyDict_CheckForConversion(pResult, pci->retc, retnames, &msg); + pResult = PyDict_CheckForConversion(pResult, retcols, retnames, &msg); if (retnames != NULL) GDKfree(retnames); - } else { + } else if (varres) { + msg = createException(MAL, "pyapi.eval", "Expected a variable number return values, but the return type was not a dictionary. We require the return type to be a dictionary for column naming purposes."); + goto wrapup; + } + else { // Now we need to do some error checking on the result object, because the result object has to have the correct type/size // We will also do some converting of result objects to a common type (such as scalar -> [[scalar]]) - pResult = PyObject_CheckForConversion(pResult, pci->retc, NULL, &msg); + pResult = PyObject_CheckForConversion(pResult, retcols, NULL, &msg); } if (pResult == NULL) { goto wrapup; @@ -924,7 +946,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb } VERBOSE_MESSAGE("Collecting return values.\n"); - + if (varres) { + GDKfree(pyreturn_values); + pyreturn_values = GDKzalloc(retcols * sizeof(PyReturn)); + } // Now we have executed the Python function, we have to collect the return values and convert them to BATs // We will first collect header information about the Python return objects and extract the underlying C arrays @@ -933,7 +958,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb // The reason we are doing this as a separate step is because this preprocessing requires us to call the Python API // Whereas the actual returning does not require us to call the Python API // This means we can do the actual returning without holding the GIL - if (!PyObject_PreprocessObject(pResult, pyreturn_values, pci->retc, &msg)) { + if (!PyObject_PreprocessObject(pResult, pyreturn_values, retcols, &msg)) { goto wrapup; } @@ -963,10 +988,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb #endif // Now we will write data about our result (memory size, type, number of elements) to the header ptr = (ReturnBatDescr*)shm_ptr; - for (i = 0; i < pci->retc; i++) + for (i = 0; i < retcols; i++) { PyReturn *ret = &pyreturn_values[i]; - ReturnBatDescr *descr = &ptr[(process_id - 1) * pci->retc + i]; + ReturnBatDescr *descr = &ptr[(process_id - 1) * retcols + i]; if (ret->result_type == NPY_OBJECT) { // We can't deal with NPY_OBJECT arrays, because these are 'arrays of pointers', so we can't just copy the content of the array into shared memory @@ -1020,7 +1045,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb { // So if we get here, we know all processes have finished and that we are the last process to pass the first semaphore // Since we are the last process, it is our job to create the shared memory for each of the return values - for (i = 0; i < pci->retc; i++) + for (i = 0; i < retcols; i++) { size_t return_size = 0; size_t mask_size = 0; @@ -1028,7 +1053,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb // Now we will count the size of the return values for each of the processes for(j = 0; j < process_count; j++) { - ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * pci->retc + i]); + ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * retcols + i]); return_size += descr->bat_size; mask_size += descr->bat_count * sizeof(bool); has_mask = has_mask || descr->has_mask; @@ -1044,7 +1069,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb if (has_mask) { assert(mask_size > 0); - if (create_shared_memory(shm_id + pci->retc + (i + 1), mask_size, NULL) != MAL_SUCCEED) //create a memory space for the mask + if (create_shared_memory(shm_id + retcols + (i + 1), mask_size, NULL) != MAL_SUCCEED) //create a memory space for the mask { msg = createException(MAL, "pyapi.eval", "Failed to allocate shared memory for returning mask.\n"); goto wrapup; @@ -1067,11 +1092,11 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb // However, we do not know if any of the other childs have failed // If they have, they have written descr->npy_type to -1 in one of their headers // So we check for that - for (i = 0; i < pci->retc; i++) + for (i = 0; i < retcols; i++) { for(j = 0; j < process_count; j++) { - ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * pci->retc + i]); + ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * retcols + i]); if (descr->npy_type < 0) { // If any of the child processes have failed, exit without an error code because we did not fail @@ -1083,7 +1108,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb } // Now we can finally return the values - for (i = 0; i < pci->retc; i++) + for (i = 0; i < retcols; i++) { char *mem_ptr; PyReturn *ret = &pyreturn_values[i]; @@ -1095,7 +1120,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb bool has_mask = false; for(j = 0; j < process_count; j++) { - ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * pci->retc + i]); + ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * retcols + i]); if (j < (process_id - 1)) { start_size += descr->bat_size; @@ -1118,7 +1143,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb if (has_mask) { bool *mask_ptr; - msg = get_shared_memory(shm_id + pci->retc + (i + 1), mask_size, (void**) &mask_ptr); + msg = get_shared_memory(shm_id + retcols + (i + 1), mask_size, (void**) &mask_ptr); // If any of the processes return a mask, we need to write our mask values to the shared memory if (mask_ptr == NULL) { @@ -1151,17 +1176,23 @@ returnvalues: /*[RETURN_VALUES]*/ VERBOSE_MESSAGE("Returning values.\n"); - for (i = 0; i < pci->retc; i++) + for (i = 0; i < retcols; i++) { PyReturn *ret = &pyreturn_values[i]; - int bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i))); + int bat_type = TYPE_any; + if (!varres) { + bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i))); - if (bat_type == TYPE_any || bat_type == TYPE_void) { - getArgType(mb,pci,i) = bat_type; - msg = createException(MAL, "pyapi.eval", "Unknown return value, possibly projecting with no parameters."); - goto wrapup; + if (bat_type == TYPE_any || bat_type == TYPE_void) { + getArgType(mb,pci,i) = bat_type; + msg = createException(MAL, "pyapi.eval", "Unknown return value, possibly projecting with no parameters."); + goto wrapup; + } + } else { + bat_type = PyType_ToBat(ret->result_type); } + b = PyObject_ConvertToBAT(ret, bat_type, i, seqbase, &msg); if (b == NULL) { goto wrapup; @@ -1200,8 +1231,8 @@ returnvalues: // To indicate that we failed, we will write information to our header ptr = (ReturnBatDescr*)shm_ptr; - for (i = 0; i < pci->retc; i++) { - ReturnBatDescr *descr = &ptr[(process_id - 1) * pci->retc + i]; + for (i = 0; i < retcols; i++) { + ReturnBatDescr *descr = &ptr[(process_id - 1) * retcols + i]; // We will write descr->npy_type to -1, so other processes can see that we failed descr->npy_type = -1; // We will write the memory size of our error message to the bat_size, so the main process can access the shared memory @@ -1245,7 +1276,7 @@ returnvalues: //thus we need to free python objects, thus we need to obtain the GIL gstate = PyGILState_Ensure(); } - for (i = 0; i < pci->retc; i++) { + for (i = 0; i < retcols; i++) { PyReturn *ret = &pyreturn_values[i]; // First clean up any return values if (!ret->multidimensional) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list