Changeset: 4b9abd901b85 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4b9abd901b85
Added Files:
        monetdb5/modules/mal/mal_weld.h
Modified Files:
        gdk/Makefile.ag
        monetdb5/modules/mal/Makefile.ag
        monetdb5/modules/mal/mal_weld.c
        monetdb5/modules/mal/mal_weld.mal
        monetdb5/modules/mal/mal_weld.mal.sh
Branch: mal-weld
Log Message:

weld code for tpch06


diffs (truncated from 1187 to 300 lines):

diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
--- a/gdk/Makefile.ag
+++ b/gdk/Makefile.ag
@@ -6,8 +6,7 @@
 
 MTSAFE
 
-INCLUDES = ../common/options ../common/stream ../common/utils \
-                  $(valgrind_CFLAGS) $(WELD_INCS)
+INCLUDES = ../common/options ../common/stream ../common/utils 
$(valgrind_CFLAGS)
 
 lib_gdk = {
        VERSION = $(GDK_VERSION)
@@ -40,8 +39,7 @@ lib_gdk = {
                ../common/stream/libstream \
                ../common/utils/libmutils \
                $(MATH_LIBS) $(SOCKET_LIBS) $(zlib_LIBS) $(BZ_LIBS) \
-               $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) 
$(KVM_LIBS) \
-               $(WELD_LIBS)
+               $(MALLOC_LIBS) $(PTHREAD_LIBS) $(DL_LIBS) $(PSAPILIB) 
$(KVM_LIBS)
 }
 
 headers_h = {
diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag
--- a/monetdb5/modules/mal/Makefile.ag
+++ b/monetdb5/modules/mal/Makefile.ag
@@ -11,7 +11,7 @@ INCLUDES = ../../mal ../atoms ../kernel 
        ../../../common/utils \
        ../../../gdk \
        $(pcre_CFLAGS) $(zlib_CFLAGS) $(BZIP_INCS) $(MSGCONTROL_FLAGS) \
-       $(openssl_CFLAGS)
+       $(openssl_CFLAGS) $(WELD_INCS)
 
 MTSAFE
 
@@ -30,7 +30,7 @@ lib_mal = {
                language.c language.h \
                mal_io.c mal_io.h \
                mal_mapi.c mal_mapi.h \
-               mal_weld.c \
+               mal_weld.c mal_weld.h \
                manual.c manual.h \
                mat.c mat.h \
                mdb.c mdb.h \
@@ -51,6 +51,7 @@ lib_mal = {
                sample.c sample.h \
                json_util.c json_util.h \
                calc.c batcalc.c
+       LIBS = $(WELD_LIBS)
 }
 
 headers_mal = {
diff --git a/monetdb5/modules/mal/mal_weld.c b/monetdb5/modules/mal/mal_weld.c
--- a/monetdb5/modules/mal/mal_weld.c
+++ b/monetdb5/modules/mal/mal_weld.c
@@ -10,60 +10,315 @@
 #include "gdk.h"
 #include "mal_exception.h"
 #include "mal_interpreter.h"
+#include "mal_instruction.h"
+#include "mal_weld.h"
+#include "weld.h"
 
-mal_export str WeldInitState(ptr *retval);
+#define STR_SIZE_INC 4096
+#define OP_GET 0
+#define OP_SET 1
+
+/* Variables in Weld will be named vXX - e.g. v19 
+ * */
+
+#define getOrSetStructMemberImpl(ADDR, TYPE, VALUE, OP)                 \
+       if ((long)*ADDR % sizeof(TYPE) != 0)                                \
+               *ADDR += sizeof(TYPE) - (long)*ADDR % sizeof(TYPE); /* aling */ 
\
+       if (OP == OP_GET)                                                   \
+               *(TYPE *)VALUE = *(TYPE *)(*ADDR); /* get */                    
\
+       else                                                                \
+               *(TYPE *)(*ADDR) = *(TYPE *)VALUE; /* set */                    
\
+       *ADDR += sizeof(TYPE);                             /* increase */
+
+static void prependWeldStmt(weldState *wstate, str weldStmt) {
+       if (strlen(wstate->program) + strlen(weldStmt) >= 
wstate->programMaxLen) {
+               wstate->programMaxLen = strlen(wstate->program) + 
strlen(weldStmt) + 1;
+               wstate->program = realloc(wstate->program, 
wstate->programMaxLen * sizeof(char));
+       }
+       memmove(wstate->program + strlen(weldStmt), wstate->program, 
strlen(wstate->program) + 1);
+       memcpy(wstate->program, weldStmt, strlen(weldStmt));
+}
+
+static void appendWeldStmt(weldState *wstate, str weldStmt) {
+       if (strlen(wstate->program) + strlen(weldStmt) >= 
wstate->programMaxLen) {
+               wstate->programMaxLen = strlen(wstate->program) + 
strlen(weldStmt) + 1;
+               wstate->program = realloc(wstate->program, 
wstate->programMaxLen * sizeof(char));
+       }
+       wstate->program = strcat(wstate->program, weldStmt);
+}
+
+static str getWeldType(int type) {
+       if (type == TYPE_bte)
+               return "i8";
+       else if (type == TYPE_int)
+               return "i32";
+       else if (type == TYPE_lng)
+               return "i64";
+       else if (type == TYPE_flt)
+               return "f32";
+       else if (type == TYPE_dbl)
+               return "f64";
+       else if (type == TYPE_str)
+               return "vec[i8]";
+       else if (ATOMstorage(type) != type)
+               return getWeldType(ATOMstorage(type));
+       else
+               return NULL;
+}
+
+static void getOrSetStructMember(char **addr, int type, void *value, int op) {
+       if (type == TYPE_bte) {
+               getOrSetStructMemberImpl(addr, char, value, op);
+       } else if (type == TYPE_int) {
+               getOrSetStructMemberImpl(addr, int, value, op);
+       } else if (type == TYPE_lng) {
+               getOrSetStructMemberImpl(addr, long, value, op);
+       } else if (type == TYPE_flt) {
+               getOrSetStructMemberImpl(addr, float, value, op);
+       } else if (type == TYPE_dbl) {
+               getOrSetStructMemberImpl(addr, double, value, op);
+       } else if (type == TYPE_str) {
+               getOrSetStructMemberImpl(addr, char*, value, op);
+       } else if (type == TYPE_ptr) {
+               /* TODO - will assume that all pointers have the same size */
+               getOrSetStructMemberImpl(addr, char*, value, op);
+       } else if (ATOMstorage(type) != type) {
+               return getOrSetStructMember(addr, ATOMstorage(type), value, op);
+       }
+}
+
+/* Candidate lists can be dense so we have to replace them with a rangeiter */
+static str getWeldCandList(int sid, bat s) {
+       static char candList[STR_SIZE_INC];
+       BAT *list = is_bat_nil(s) ? NULL : BATdescriptor(s);
+       if (list == NULL || !BATtdense(list)) {
+               sprintf(candList, "v%d", sid);
+       } else {
+               sprintf(candList, "rangeiter(%ldL, %ldL, 1L)", list->tseqbase,
+                               list->tseqbase + list->batCount);
+       }
+       return candList;
+}
+
+static void dumpWeldProgram(weldState *wstate) {
+       FILE *f = fopen(tmpnam(NULL), "w");
+       int i;
+       for (i = 0; i < (int)strlen(wstate->program); i++) {
+               if (wstate->program[i] == ' ' && wstate->program[i + 1] == 
'\t') {
+                       fputc('\n', f);
+               } else {
+                       fputc(wstate->program[i], f);
+               }
+               if (wstate->program[i] == ';') {
+                       fputc('\n', f);
+               }
+       }
+       fclose(f);
+}
+
 str
 WeldInitState(ptr *retval)
 {
-       (void) retval;
+       weldState *wstate = malloc(sizeof(weldState));
+       wstate->programMaxLen = 1;
+       wstate->program = calloc(wstate->programMaxLen, sizeof(char));
+       *retval = wstate;
        return MAL_SUCCEED;
 }
 
-mal_export str WeldRun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 str
 WeldRun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
-       (void) cntxt;
-       (void) mb;
-       (void) stk;
-       (void) pci;
+       (void)cntxt;
+       (void)mb;
+       weldState *wstate = *getArgReference_ptr(stk, pci, pci->retc);
+       int i, inputLen = 0, inputMaxLen = 0, outputLen = 0, outputMaxLen = 0;
+       str inputStmt = NULL, outputStmt = NULL;
+
+       /* Build the input stmt, e.g.: |v13:i32, v50:vec[i8], v50hseqbase:i64| 
*/
+       for (i = pci->retc + 1; i < pci->argc; i++) { /* skip wstate on 
pci->retc */
+               if (inputLen + 128 > inputMaxLen) {
+                       inputMaxLen += STR_SIZE_INC;
+                       inputStmt = realloc(inputStmt, inputMaxLen * 
sizeof(char));
+               }
+               int type = getArgType(mb, pci, i);
+               if (isaBatType(type)) {
+                       inputLen += sprintf(inputStmt + inputLen, " 
v%d:vec[%s], v%dhseqbase:i64,",
+                                                               getArg(pci, i), 
getWeldType(getBatType(type)), getArg(pci, i));
+               } else {
+                       inputLen +=
+                               sprintf(inputStmt + inputLen, " v%d:%s,", 
getArg(pci, i), getWeldType(type));
+               }
+       }
+       inputStmt[0] = '|';
+       inputStmt[inputLen - 1] = '|';
+       prependWeldStmt(wstate, inputStmt);
+       free(inputStmt);
+       /* Build the output stmt, e.g.: {v1, v99} */
+       for (i = 0; i < pci->retc; i++) {
+               if (outputLen + 128 > outputMaxLen) {
+                       outputMaxLen += STR_SIZE_INC;
+                       outputStmt = realloc(outputStmt, outputMaxLen * 
sizeof(char));
+               }
+               outputLen += sprintf(outputStmt + outputLen, " v%d,", 
getArg(pci, i));
+       }
+       outputStmt[0] = '{';
+       outputStmt[outputLen - 1] = '}';
+       appendWeldStmt(wstate, outputStmt);
+       free(outputStmt);
+
+       weld_error_t e = weld_error_new();
+       weld_conf_t conf = weld_conf_new();
+       (void)dumpWeldProgram; /* supress the unused warning */
+#ifdef WELD_DEBUG
+       dumpWeldProgram(wstate);
+       weld_conf_set(conf, "weld.compile.dumpCode", "true");
+       weld_conf_set(conf, "weld.compile.dumpCodeDir", "/tmp");
+#endif
+       weld_module_t m = weld_module_compile(wstate->program, conf, e);
+       weld_conf_free(conf);
+       free(wstate->program);
+       free(wstate);
+       if (weld_error_code(e)) {
+               throw(MAL, "weld.run", PROGRAM_GENERAL ": %s", 
weld_error_message(e));
+       }
+
+       /* Prepare the input for Weld. We're building an array that has the 
layout of a struct */
+       /* Max possible size is when we only have bats, so we have 1 ptr for 
the array,
+        * 1 lng for the size and 1 lng for hseqbase */
+       char *inputStruct = malloc((pci->argc - pci->retc) * (sizeof(void *) + 
2 * sizeof(lng)));
+       char *inputPtr = inputStruct;
+       for (i = pci->retc + 1; i < pci->argc; i++) { /* skip wstate on 
pci->retc */
+               int type = getArgType(mb, pci, i);
+               if (isaBatType(type)) {
+                       bat bid = *getArgReference_bat(stk, pci, i);
+                       BAT *b = BATdescriptor(bid);
+                       if (b == NULL) throw(MAL, "weld.run", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
+                       /* TODO handle string colums */
+                       getOrSetStructMember(&inputPtr, TYPE_ptr, 
&b->theap.base, OP_SET);
+                       getOrSetStructMember(&inputPtr, TYPE_lng, &b->batCount, 
OP_SET);
+                       getOrSetStructMember(&inputPtr, TYPE_lng, &b->hseqbase, 
OP_SET);
+               } else {
+                       getOrSetStructMember(&inputPtr, type, 
getArgReference(stk, pci, i), OP_SET);
+                       if (type == TYPE_str) {
+                               long len = strlen(*getArgReference_str(stk, 
pci, i));
+                               getOrSetStructMember(&inputPtr, TYPE_lng, &len, 
OP_SET);
+                       }
+               }
+       }
+
+       weld_value_t arg = weld_value_new(inputStruct);
+       conf = weld_conf_new();
+       weld_value_t result = weld_module_run(m, conf, arg, e);
+
+       /* Retrieve the output */
+       char *outputStruct = weld_value_data(result);
+       for (i = 0; i < pci->retc; i++) {
+               int type = getArgType(mb, pci, i);
+               if (isaBatType(type)) {
+                       BAT *b = COLnew(0, getBatType(type), 0, TRANSIENT);
+                       /* TODO handle string columns */
+                       getOrSetStructMember(&outputStruct, TYPE_ptr, 
&b->theap.base, OP_GET);
+                       getOrSetStructMember(&outputStruct, TYPE_lng, 
&b->batCount, OP_GET);
+                       b->theap.free = b->batCount << b->tshift;
+                       b->theap.size = b->batCount << b->tshift;
+                       b->batCapacity = b->batCount;
+                       b->theap.storage = STORE_CMEM;
+                       /* TODO check if the sorted props are important for the 
rest of the execution */
+                       b->tsorted = b->trevsorted = 0;
+                       BBPkeepref(b->batCacheid);
+                       *getArgReference_bat(stk, pci, i) = b->batCacheid;
+               } else {
+                       /* TODO handle strings */
+                       getOrSetStructMember(&outputStruct, type, 
getArgReference(stk, pci, i), OP_GET);
+               }
+       }
+
+       weld_error_free(e);
+       weld_value_free(arg);
+       /* TODO does not work as advertised in the doc. It will free our data 
buffers as well */
+       //weld_value_free(result);
+       weld_conf_free(conf);
+       weld_module_free(m);
+       free(inputStruct);
+
        return MAL_SUCCEED;
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to