Changeset: 4b9abd901b85 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4b9abd901b85 Added Files: monetdb5/modules/mal/mal_weld.h Modified Files: gdk/Makefile.ag monetdb5/modules/mal/Makefile.ag monetdb5/modules/mal/mal_weld.c monetdb5/modules/mal/mal_weld.mal monetdb5/modules/mal/mal_weld.mal.sh Branch: mal-weld Log Message:
weld code for tpch06 diffs (truncated from 1187 to 300 lines): diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag --- a/gdk/Makefile.ag +++ b/gdk/Makefile.ag @@ -6,8 +6,7 @@ MTSAFE -INCLUDES = ../common/options ../common/stream ../common/utils \ - $(valgrind_CFLAGS) $(WELD_INCS) +INCLUDES = ../common/options ../common/stream ../common/utils $(valgrind_CFLAGS) lib_gdk = { VERSION = $(GDK_VERSION) @@ -40,8 +39,7 @@ lib_gdk = { ../common/stream/libstream \ ../common/utils/libmutils \ $(MATH_LIBS) $(SOCKET_LIBS) $(zlib_LIBS) $(BZ_LIBS) \ - $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) $(KVM_LIBS) \ - $(WELD_LIBS) + $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) $(KVM_LIBS) } headers_h = { diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag --- a/monetdb5/modules/mal/Makefile.ag +++ b/monetdb5/modules/mal/Makefile.ag @@ -11,7 +11,7 @@ INCLUDES = ../../mal ../atoms ../kernel ../../../common/utils \ ../../../gdk \ $(pcre_CFLAGS) $(zlib_CFLAGS) $(BZIP_INCS) $(MSGCONTROL_FLAGS) \ - $(openssl_CFLAGS) + $(openssl_CFLAGS) $(WELD_INCS) MTSAFE @@ -30,7 +30,7 @@ lib_mal = { language.c language.h \ mal_io.c mal_io.h \ mal_mapi.c mal_mapi.h \ - mal_weld.c \ + mal_weld.c mal_weld.h \ manual.c manual.h \ mat.c mat.h \ mdb.c mdb.h \ @@ -51,6 +51,7 @@ lib_mal = { sample.c sample.h \ json_util.c json_util.h \ calc.c batcalc.c + LIBS = $(WELD_LIBS) } headers_mal = { diff --git a/monetdb5/modules/mal/mal_weld.c b/monetdb5/modules/mal/mal_weld.c --- a/monetdb5/modules/mal/mal_weld.c +++ b/monetdb5/modules/mal/mal_weld.c @@ -10,60 +10,315 @@ #include "gdk.h" #include "mal_exception.h" #include "mal_interpreter.h" +#include "mal_instruction.h" +#include "mal_weld.h" +#include "weld.h" -mal_export str WeldInitState(ptr *retval); +#define STR_SIZE_INC 4096 +#define OP_GET 0 +#define OP_SET 1 + +/* Variables in Weld will be named vXX - e.g. v19 + * */ + +#define getOrSetStructMemberImpl(ADDR, TYPE, VALUE, OP) \ + if ((long)*ADDR % sizeof(TYPE) != 0) \ + *ADDR += sizeof(TYPE) - (long)*ADDR % sizeof(TYPE); /* aling */ \ + if (OP == OP_GET) \ + *(TYPE *)VALUE = *(TYPE *)(*ADDR); /* get */ \ + else \ + *(TYPE *)(*ADDR) = *(TYPE *)VALUE; /* set */ \ + *ADDR += sizeof(TYPE); /* increase */ + +static void prependWeldStmt(weldState *wstate, str weldStmt) { + if (strlen(wstate->program) + strlen(weldStmt) >= wstate->programMaxLen) { + wstate->programMaxLen = strlen(wstate->program) + strlen(weldStmt) + 1; + wstate->program = realloc(wstate->program, wstate->programMaxLen * sizeof(char)); + } + memmove(wstate->program + strlen(weldStmt), wstate->program, strlen(wstate->program) + 1); + memcpy(wstate->program, weldStmt, strlen(weldStmt)); +} + +static void appendWeldStmt(weldState *wstate, str weldStmt) { + if (strlen(wstate->program) + strlen(weldStmt) >= wstate->programMaxLen) { + wstate->programMaxLen = strlen(wstate->program) + strlen(weldStmt) + 1; + wstate->program = realloc(wstate->program, wstate->programMaxLen * sizeof(char)); + } + wstate->program = strcat(wstate->program, weldStmt); +} + +static str getWeldType(int type) { + if (type == TYPE_bte) + return "i8"; + else if (type == TYPE_int) + return "i32"; + else if (type == TYPE_lng) + return "i64"; + else if (type == TYPE_flt) + return "f32"; + else if (type == TYPE_dbl) + return "f64"; + else if (type == TYPE_str) + return "vec[i8]"; + else if (ATOMstorage(type) != type) + return getWeldType(ATOMstorage(type)); + else + return NULL; +} + +static void getOrSetStructMember(char **addr, int type, void *value, int op) { + if (type == TYPE_bte) { + getOrSetStructMemberImpl(addr, char, value, op); + } else if (type == TYPE_int) { + getOrSetStructMemberImpl(addr, int, value, op); + } else if (type == TYPE_lng) { + getOrSetStructMemberImpl(addr, long, value, op); + } else if (type == TYPE_flt) { + getOrSetStructMemberImpl(addr, float, value, op); + } else if (type == TYPE_dbl) { + getOrSetStructMemberImpl(addr, double, value, op); + } else if (type == TYPE_str) { + getOrSetStructMemberImpl(addr, char*, value, op); + } else if (type == TYPE_ptr) { + /* TODO - will assume that all pointers have the same size */ + getOrSetStructMemberImpl(addr, char*, value, op); + } else if (ATOMstorage(type) != type) { + return getOrSetStructMember(addr, ATOMstorage(type), value, op); + } +} + +/* Candidate lists can be dense so we have to replace them with a rangeiter */ +static str getWeldCandList(int sid, bat s) { + static char candList[STR_SIZE_INC]; + BAT *list = is_bat_nil(s) ? NULL : BATdescriptor(s); + if (list == NULL || !BATtdense(list)) { + sprintf(candList, "v%d", sid); + } else { + sprintf(candList, "rangeiter(%ldL, %ldL, 1L)", list->tseqbase, + list->tseqbase + list->batCount); + } + return candList; +} + +static void dumpWeldProgram(weldState *wstate) { + FILE *f = fopen(tmpnam(NULL), "w"); + int i; + for (i = 0; i < (int)strlen(wstate->program); i++) { + if (wstate->program[i] == ' ' && wstate->program[i + 1] == '\t') { + fputc('\n', f); + } else { + fputc(wstate->program[i], f); + } + if (wstate->program[i] == ';') { + fputc('\n', f); + } + } + fclose(f); +} + str WeldInitState(ptr *retval) { - (void) retval; + weldState *wstate = malloc(sizeof(weldState)); + wstate->programMaxLen = 1; + wstate->program = calloc(wstate->programMaxLen, sizeof(char)); + *retval = wstate; return MAL_SUCCEED; } -mal_export str WeldRun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); str WeldRun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - (void) cntxt; - (void) mb; - (void) stk; - (void) pci; + (void)cntxt; + (void)mb; + weldState *wstate = *getArgReference_ptr(stk, pci, pci->retc); + int i, inputLen = 0, inputMaxLen = 0, outputLen = 0, outputMaxLen = 0; + str inputStmt = NULL, outputStmt = NULL; + + /* Build the input stmt, e.g.: |v13:i32, v50:vec[i8], v50hseqbase:i64| */ + for (i = pci->retc + 1; i < pci->argc; i++) { /* skip wstate on pci->retc */ + if (inputLen + 128 > inputMaxLen) { + inputMaxLen += STR_SIZE_INC; + inputStmt = realloc(inputStmt, inputMaxLen * sizeof(char)); + } + int type = getArgType(mb, pci, i); + if (isaBatType(type)) { + inputLen += sprintf(inputStmt + inputLen, " v%d:vec[%s], v%dhseqbase:i64,", + getArg(pci, i), getWeldType(getBatType(type)), getArg(pci, i)); + } else { + inputLen += + sprintf(inputStmt + inputLen, " v%d:%s,", getArg(pci, i), getWeldType(type)); + } + } + inputStmt[0] = '|'; + inputStmt[inputLen - 1] = '|'; + prependWeldStmt(wstate, inputStmt); + free(inputStmt); + /* Build the output stmt, e.g.: {v1, v99} */ + for (i = 0; i < pci->retc; i++) { + if (outputLen + 128 > outputMaxLen) { + outputMaxLen += STR_SIZE_INC; + outputStmt = realloc(outputStmt, outputMaxLen * sizeof(char)); + } + outputLen += sprintf(outputStmt + outputLen, " v%d,", getArg(pci, i)); + } + outputStmt[0] = '{'; + outputStmt[outputLen - 1] = '}'; + appendWeldStmt(wstate, outputStmt); + free(outputStmt); + + weld_error_t e = weld_error_new(); + weld_conf_t conf = weld_conf_new(); + (void)dumpWeldProgram; /* supress the unused warning */ +#ifdef WELD_DEBUG + dumpWeldProgram(wstate); + weld_conf_set(conf, "weld.compile.dumpCode", "true"); + weld_conf_set(conf, "weld.compile.dumpCodeDir", "/tmp"); +#endif + weld_module_t m = weld_module_compile(wstate->program, conf, e); + weld_conf_free(conf); + free(wstate->program); + free(wstate); + if (weld_error_code(e)) { + throw(MAL, "weld.run", PROGRAM_GENERAL ": %s", weld_error_message(e)); + } + + /* Prepare the input for Weld. We're building an array that has the layout of a struct */ + /* Max possible size is when we only have bats, so we have 1 ptr for the array, + * 1 lng for the size and 1 lng for hseqbase */ + char *inputStruct = malloc((pci->argc - pci->retc) * (sizeof(void *) + 2 * sizeof(lng))); + char *inputPtr = inputStruct; + for (i = pci->retc + 1; i < pci->argc; i++) { /* skip wstate on pci->retc */ + int type = getArgType(mb, pci, i); + if (isaBatType(type)) { + bat bid = *getArgReference_bat(stk, pci, i); + BAT *b = BATdescriptor(bid); + if (b == NULL) throw(MAL, "weld.run", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING); + /* TODO handle string colums */ + getOrSetStructMember(&inputPtr, TYPE_ptr, &b->theap.base, OP_SET); + getOrSetStructMember(&inputPtr, TYPE_lng, &b->batCount, OP_SET); + getOrSetStructMember(&inputPtr, TYPE_lng, &b->hseqbase, OP_SET); + } else { + getOrSetStructMember(&inputPtr, type, getArgReference(stk, pci, i), OP_SET); + if (type == TYPE_str) { + long len = strlen(*getArgReference_str(stk, pci, i)); + getOrSetStructMember(&inputPtr, TYPE_lng, &len, OP_SET); + } + } + } + + weld_value_t arg = weld_value_new(inputStruct); + conf = weld_conf_new(); + weld_value_t result = weld_module_run(m, conf, arg, e); + + /* Retrieve the output */ + char *outputStruct = weld_value_data(result); + for (i = 0; i < pci->retc; i++) { + int type = getArgType(mb, pci, i); + if (isaBatType(type)) { + BAT *b = COLnew(0, getBatType(type), 0, TRANSIENT); + /* TODO handle string columns */ + getOrSetStructMember(&outputStruct, TYPE_ptr, &b->theap.base, OP_GET); + getOrSetStructMember(&outputStruct, TYPE_lng, &b->batCount, OP_GET); + b->theap.free = b->batCount << b->tshift; + b->theap.size = b->batCount << b->tshift; + b->batCapacity = b->batCount; + b->theap.storage = STORE_CMEM; + /* TODO check if the sorted props are important for the rest of the execution */ + b->tsorted = b->trevsorted = 0; + BBPkeepref(b->batCacheid); + *getArgReference_bat(stk, pci, i) = b->batCacheid; + } else { + /* TODO handle strings */ + getOrSetStructMember(&outputStruct, type, getArgReference(stk, pci, i), OP_GET); + } + } + + weld_error_free(e); + weld_value_free(arg); + /* TODO does not work as advertised in the doc. It will free our data buffers as well */ + //weld_value_free(result); + weld_conf_free(conf); + weld_module_free(m); + free(inputStruct); + return MAL_SUCCEED; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list