Changeset: 574c9288edd4 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=574c9288edd4
Modified Files:
        sql/backends/monet5/rel_weld.c
Branch: rel-weld
Log Message:

weld impl for Order By - not working yet, depends on Weld issue #347


diffs (159 lines):

diff --git a/sql/backends/monet5/rel_weld.c b/sql/backends/monet5/rel_weld.c
--- a/sql/backends/monet5/rel_weld.c
+++ b/sql/backends/monet5/rel_weld.c
@@ -40,6 +40,7 @@
  * -  String BATs are backed by 2 arrays: one with the strings and the other 
with the offsets. At the end of the Weld
  *    program we need to also return the strings array so that we can later 
build a string BAT, so special care is
  *    need to ensure that the strings array is referenced correctly throughout 
the program.
+ * -  Weld only supports sorting in ascending order for now
  * */
 
 #define STR_BUF_SIZE 4096
@@ -350,17 +351,56 @@ select_produce(backend *be, sql_rel *rel
 static int
 project_produce(backend *be, sql_rel *rel, weld_state *wstate)
 {
+       char weld_stmt[STR_BUF_SIZE * 2];
+       char col_name[256];
+       int len = 0, i, count;
+       node *en;
+       sql_exp *exp;
+       list* col_list = sa_list(be->mvc->sa);
+       list* exp_list = sa_list(be->mvc->sa);
+
        /* === Produce === */
+       int old_num_parens = wstate->num_parens;
+       int old_num_loops = wstate->num_loops;
+       str old_builder = wstate->builder;
+       int result_var = 0;
+       if (rel->r) {
+               /* Order by statement */
+               wstate->num_parens = wstate->num_loops = 0;
+               result_var = wstate->next_var++;
+               wstate->num_parens++;
+               sprintf(weld_stmt, "let v%d = (", result_var);
+               append_weld_stmt(wstate, weld_stmt);
+
+               /* New builder */
+               len = sprintf(weld_stmt, "appender[{");
+               exp_list = list_merge(exp_list, rel->exps, NULL);
+               exp_list = list_merge(exp_list, rel->r, NULL);
+               for (en = exp_list->h; en; en = en->next) {
+                       exp = en->data;
+                       sprintf(col_name, "%s_%s", exp->rname ? exp->rname : 
exp->name, exp->name);
+                       if (list_find(col_list, col_name, (fcmp)strcmp)) {
+                               /* column already added from projection */
+                               continue;
+                       }
+                       int type = exp_subtype(exp)->type->localtype;
+                       if (type == TYPE_str) {
+                               len += sprintf(weld_stmt + len, "?,");
+                       } else {
+                               len += sprintf(weld_stmt + len, "%s,", 
getWeldType(type));
+                       }
+                       list_append(col_list, sa_strdup(be->mvc->sa, col_name));
+               }
+               len += sprintf(weld_stmt + len - 1, "}]") - 1;  /* also replace 
the last comma */
+               wstate->builder = weld_stmt;
+       }
+
        produce_func input_produce = getproduce_func(rel->l);
        if (input_produce == NULL) return -1;
        if (input_produce(be, rel->l, wstate) != 0) return -1;
 
        /* === Consume === */
-       char weld_stmt[STR_BUF_SIZE * 2];
-       char col_name[256];
-       int len = 0;
-       node *en;
-       sql_exp *exp;
+       len = 0;
        for (en = rel->exps->h; en; en = en->next) {
                exp = en->data;
                sprintf(col_name, "%s_%s", exp->rname ? exp->rname : exp->name, 
exp->name);
@@ -385,6 +425,74 @@ project_produce(backend *be, sql_rel *re
                        len += sprintf(weld_stmt + len, ";");
                }
        }
+       if (rel->r) {
+               /* Sorting phase - begin by materializing the columns in an 
array of structs */
+               len += sprintf(weld_stmt + len, "merge(b%d, {", 
wstate->num_loops);
+               list* col_list = sa_list(be->mvc->sa);
+               for (en = exp_list->h; en; en = en->next) {
+                       exp = en->data;
+                       sprintf(col_name, "%s_%s", exp->rname ? exp->rname : 
exp->name, exp->name);
+                       if (list_find(col_list, col_name, (fcmp)strcmp)) {
+                               /* column already added from projection */
+                               continue;
+                       }
+                       if (exp_subtype(exp)->type->localtype == TYPE_str) {
+                               len += sprintf(weld_stmt + len, "%s_stridx,", 
col_name);
+                       } else {
+                               len += sprintf(weld_stmt + len, "%s,", 
col_name);
+                       }
+                       list_append(col_list, sa_strdup(be->mvc->sa, col_name));
+               }
+               weld_stmt[len - 1] = '}';
+               for (i = 0; i < wstate->num_parens + 1; i++) {
+                       len += sprintf(weld_stmt + len, ")");
+               }
+               len += sprintf(weld_stmt + len, ";");
+               /* Sort the array of structs */
+               wstate->next_var++;
+               len += sprintf(weld_stmt + len, "let v%d = sort(result(v%d), 
|n| ", wstate->next_var, result_var);
+               for (en = ((list*)rel->r)->h; en; en = en->next) {
+                       exp = en->data;
+                       sprintf(col_name, "%s_%s", exp->rname ? exp->rname : 
exp->name, exp->name);
+                       node *col_list_node = list_find(col_list, col_name, 
(fcmp)strcmp);
+                       int idx = list_position(col_list, col_list_node->data);
+                       if (exp_subtype(exp)->type->localtype == TYPE_str) {
+                               len += sprintf(weld_stmt + len, "let %s = 
strslice(%s_strcol, i64(n.$%d) + %s_stroffset);",
+                                       col_name, col_name, idx, col_name);
+                       } else {
+                               len += sprintf(weld_stmt + len, "let %s = 
n.$%d;", col_name, idx);
+                       }
+               }
+               len += sprintf(weld_stmt + len, "{");
+               for (en = ((list*)rel->r)->h; en; en = en->next) {
+                       exp = en->data;
+                       sprintf(col_name, "%s_%s", exp->rname ? exp->rname : 
exp->name, exp->name);
+                       len += sprintf(weld_stmt + len, "%s", col_name);
+                       if (en->next != NULL) {
+                               len += sprintf(weld_stmt + len, ", ");
+                       }
+               }
+               len += sprintf(weld_stmt + len, "});");
+               /* Resume the pipeline */
+               wstate->num_parens = old_num_parens;
+               wstate->num_loops = old_num_loops;
+               wstate->builder = old_builder;
+               wstate->num_loops++;
+               wstate->num_parens++;
+               len += sprintf(weld_stmt + len, "for(v%d, %s, |b%d, i%d, n%d|", 
wstate->next_var,
+                                          wstate->builder, wstate->num_loops, 
wstate->num_loops, wstate->num_loops);
+               for (en = rel->exps->h, count = 0; en; en = en->next, count++) {
+                       exp = en->data;
+                       sprintf(col_name, "%s_%s", exp->rname ? exp->rname : 
exp->name, exp->name);
+                       if (exp_subtype(exp)->type->localtype == TYPE_str) {
+                               len += sprintf(weld_stmt + len, "let %s = 
strslice(%s_strcol, i64(n%d.$%d) + %s_stroffset);",
+                                               col_name, col_name, 
wstate->num_loops, count, col_name);
+                               len += sprintf(weld_stmt + len, "let %s_stridx 
= n%d.$%d;", col_name, wstate->num_loops, count);
+                       } else {
+                               len += sprintf(weld_stmt + len, "let %s = 
n%d.$%d;", col_name, wstate->num_loops, count);
+                       }
+               }
+       }
        append_weld_stmt(wstate, weld_stmt);
        return 0;
 }
@@ -487,8 +595,9 @@ root_produce(backend *be, sql_rel *rel)
        int result_is_bat = rel_returns_bat(root);
        /* Save the builders in a variable */
        int result_var = wstate->next_var++;
-       sprintf(weld_stmt, "let v%d = ", result_var);
+       sprintf(weld_stmt, "let v%d = (", result_var);
        append_weld_stmt(wstate, weld_stmt);
+       wstate->num_parens++;
        /* Prepare the builders */
        if (result_is_bat) {
                len += sprintf(weld_stmt + len, "{");
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to