Changeset: d38a839b1d0a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d38a839b1d0a
Modified Files:
        monetdb5/extras/pyapi/pyapi.c
        monetdb5/extras/pyapi/pyapi.h
        monetdb5/extras/pyapi/pyapi.mal
        sql/backends/monet5/sql_gencode.c
        sql/include/sql_catalog.h
        sql/server/rel_psm.c
        sql/server/rel_select.c
        sql/server/rel_semantic.c
        sql/server/rel_updates.c
        sql/server/sql_mvc.h
        sql/server/sql_parser.h
        sql/server/sql_parser.y
        sql/server/sql_scan.c
Branch: pythonloader
Log Message:

first stage COPY INTO sometable FROM LOADER somepythonfunction();


diffs (truncated from 368 to 300 lines):

diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -418,6 +418,12 @@ PyAPIevalAggr(Client cntxt, MalBlkPtr mb
 }
 
 str
+PyAPIevalLoader(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    return PyAPIeval(cntxt, mb, stk, pci, 0, 0);
+}
+
+str
 PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
     return PyAPIeval(cntxt, mb, stk, pci, 1, 1);
diff --git a/monetdb5/extras/pyapi/pyapi.h b/monetdb5/extras/pyapi/pyapi.h
--- a/monetdb5/extras/pyapi/pyapi.h
+++ b/monetdb5/extras/pyapi/pyapi.h
@@ -96,6 +96,7 @@ pyapi_export str PyAPIevalStd(Client cnt
 pyapi_export str PyAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 pyapi_export str PyAPIevalStdMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 pyapi_export str PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+pyapi_export str PyAPIevalLoader(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
 pyapi_export str PyAPIprelude(void *ret);
 
diff --git a/monetdb5/extras/pyapi/pyapi.mal b/monetdb5/extras/pyapi/pyapi.mal
--- a/monetdb5/extras/pyapi/pyapi.mal
+++ b/monetdb5/extras/pyapi/pyapi.mal
@@ -25,6 +25,10 @@ pattern eval_aggr(fptr:ptr,expr:str,arg:
 address PyAPIevalAggr
 comment "grouped aggregates through Python";
 
+pattern eval_loader(fptr:ptr,expr:str,arg:any...):any...
+address PyAPIevalLoader
+comment "loader functions through Python";
+
 # initializer code
 command prelude() :void address PyAPIprelude;
 pyapi.prelude();
@@ -44,6 +48,10 @@ pattern eval_aggr(fptr:ptr,expr:str,arg:
 address PyAPIevalAggr
 comment "grouped aggregates through Python";
 
+pattern eval_loader(fptr:ptr,expr:str,arg:any...):any...
+address PyAPIevalLoader
+comment "loader functions through Python";
+
 module pyapimap;
 
 # The generic Python interface
diff --git a/sql/backends/monet5/sql_gencode.c 
b/sql/backends/monet5/sql_gencode.c
--- a/sql/backends/monet5/sql_gencode.c
+++ b/sql/backends/monet5/sql_gencode.c
@@ -2135,7 +2135,7 @@ static int
                        if (f->func->lang == FUNC_LANG_R || f->func->lang == 
FUNC_LANG_PY || f->func->lang == FUNC_LANG_MAP_PY)
                                q = pushStr(mb, q, f->func->query);
                        /* first dynamic output of copy* functions */
-                       if (f->func->type == F_UNION) 
+                       if (f->func->type == F_UNION || f->func->type == 
F_LOADER)
                                q = table_func_create_result(mb, q, f->func, 
f->res);
                        if (list_length(s->op1->op4.lval))
                                tpe = tail_type(s->op1->op4.lval->h->data);
@@ -3038,6 +3038,10 @@ backend_create_py_func(backend *be, sql_
                f->mod = "pyapi";
                f->imp = "eval_aggr";
                break;
+       case F_LOADER:
+               f->mod = "pyapi";
+               f->imp = "eval_loader";
+               break;
        case  F_PROC: /* no output */
        case  F_FUNC:
        default: /* ie also F_FILT and F_UNION for now */
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -281,6 +281,7 @@ typedef struct sql_arg {
 #define F_FILT 4
 #define F_UNION 5
 #define F_ANALYTIC 6
+#define F_LOADER 7
 
 #define IS_FUNC(f) (f->type == F_FUNC)
 #define IS_PROC(f) (f->type == F_PROC)
@@ -288,6 +289,7 @@ typedef struct sql_arg {
 #define IS_FILT(f) (f->type == F_FILT)
 #define IS_UNION(f) (f->type == F_UNION)
 #define IS_ANALYTIC(f) (f->type == F_ANALYTIC)
+#define IS_LOADER(f) (f->type == F_LOADER)
 
 #define FUNC_LANG_INT 0        /* internal */
 #define FUNC_LANG_MAL 1 /* create sql external mod.func */
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -699,10 +699,12 @@ rel_create_func(mvc *sql, dlist *qname, 
        char is_table = (res && res->token == SQL_TABLE);
        char is_aggr = (type == F_AGGR);
        char is_func = (type != F_PROC);
-       char *F = is_aggr?"AGGREGATE":(is_func?"FUNCTION":"PROCEDURE");
+       char is_loader = (type != F_LOADER);
+
+       char *F = 
is_loader?"LOADER":(is_aggr?"AGGREGATE":(is_func?"FUNCTION":"PROCEDURE"));
        char *KF = type==F_FILT?"FILTER ": type==F_UNION?"UNION ": "";
 
-       assert(res || type == F_PROC || type == F_FILT);
+       assert(res || type == F_PROC || type == F_FILT || type == F_LOADER);
 
        if (is_table)
                type = F_UNION;
@@ -784,18 +786,18 @@ rel_create_func(mvc *sql, dlist *qname, 
                                        (lang == 
FUNC_LANG_MAP_PY)?"pyapimap":"unknown";
                                sql->params = NULL;
                                if (create) {
-                                       f = mvc_create_func(sql, sql->sa, s, 
fname, l, restype, type, lang,  mod, fname, lang_body, FALSE, vararg);
+                                       f = mvc_create_func(sql, sql->sa, s, 
fname, l, restype, type, lang,  mod, fname, lang_body, (type == 
F_LOADER)?TRUE:FALSE, vararg);
                                } else if (!sf) {
                                        return sql_error(sql, 01, "CREATE %s%s: 
R function %s.%s not bound", KF, F, s->base.name, fname );
-                               } else {
+                               } /*else {
                                        sql_func *f = sf->func;
                                        f->mod = _STRDUP("rapi");
                                        f->imp = _STRDUP("eval");
                                        if (res && restype)
                                                f->res = restype;
-                                       f->sql = 0; /* native */
+                                       f->sql = 0;
                                        f->lang = FUNC_LANG_INT;
-                               }
+                               }*/
                        } else if (body) {
                                sql_arg *ra = (restype && 
!is_table)?restype->h->data:NULL;
                                list *b = NULL;
diff --git a/sql/server/rel_select.c b/sql/server/rel_select.c
--- a/sql/server/rel_select.c
+++ b/sql/server/rel_select.c
@@ -602,7 +602,7 @@ rel_op_(mvc *sql, sql_schema *s, char *f
 
        f = sql_bind_func(sql->sa, s, fname, NULL, NULL, type);
        if (f && 
-               ((ek.card == card_none && !f->res) || 
+               ((ek.card == card_none && !f->res) ||
                 (ek.card != card_none && f->res))) {
                return exp_op(sql->sa, NULL, f);
        } else {
diff --git a/sql/server/rel_semantic.c b/sql/server/rel_semantic.c
--- a/sql/server/rel_semantic.c
+++ b/sql/server/rel_semantic.c
@@ -155,6 +155,7 @@ rel_semantic(mvc *sql, symbol *s)
        case SQL_DELETE:
        case SQL_COPYFROM:
        case SQL_BINCOPYFROM:
+       case SQL_COPYLOADER:
        case SQL_COPYTO:
                return rel_updates(sql, s);
 
diff --git a/sql/server/rel_updates.c b/sql/server/rel_updates.c
--- a/sql/server/rel_updates.c
+++ b/sql/server/rel_updates.c
@@ -15,6 +15,7 @@
 #include "sql_privileges.h"
 #include "rel_optimizer.h"
 #include "rel_dump.h"
+#include "rel_psm.h"
 #include "sql_symbol.h"
 
 static sql_exp *
@@ -1452,6 +1453,86 @@ bincopyfrom(mvc *sql, dlist *qname, dlis
        return res;
 }
 
+
+static sql_rel *
+copyfromloader(mvc *sql, dlist *qname, symbol *fcall)
+{
+       char *sname = qname_schema(qname);
+       char *tname = qname_table(qname);
+
+       sql_schema *s = NULL;
+       sql_table *t = NULL;
+       sql_subtype tpe;
+
+       node *n;
+       sql_rel *res;
+       list *exps, *args = NULL;
+       sql_exp *import;
+       dnode *l = fcall->data.lval->h;
+       char *fname = qname_fname(l->data.lval);
+       char *f_sname = qname_schema(l->data.lval);
+       sql_schema *f_s = sql->session->schema;
+       sql_subfunc *f = NULL;
+
+       if (!copy_allowed(sql, 1)) {
+               (void) sql_error(sql, 02, "COPY INTO: insufficient privileges: "
+                               "binary COPY INTO requires database 
administrator rights");
+               return NULL;
+       }
+
+       if (sname && !(s=mvc_bind_schema(sql, sname))) {
+               (void) sql_error(sql, 02, "3F000!COPY INTO: no such schema 
'%s'", sname);
+               return NULL;
+       }
+       if (!s)
+               s = cur_schema(sql);
+       t = mvc_bind_table(sql, s, tname);
+       if (!t && !sname) {
+               s = tmp_schema(sql);
+               t = mvc_bind_table(sql, s, tname);
+               if (!t)
+                       t = stack_find_table(sql, tname);
+       }
+       if (insert_allowed(sql, t, tname, "COPY INTO", "copy into") == NULL) {
+               return NULL;
+       }
+       if (sname) {
+               f_s = mvc_bind_schema(sql, f_sname);
+               if (!f_s) {
+                       (void) sql_error(sql, 02, "3F000!COPY INTO: no such 
schema '%s'", f_sname);
+                       return NULL;
+               }
+       }
+
+       // TODO: handle parameters to bind correct version
+
+       f = sql_bind_func(sql->sa, f_s, fname, NULL, NULL, F_LOADER);
+       if (!f) {
+               (void) sql_error(sql, 02, "3F000!COPY INTO: no such loader 
function '%s'", fname);
+               return NULL;
+       }
+       f->res = table_column_types(sql->sa, t);
+
+       sql_find_subtype(&tpe, "varchar", 0, 0);
+//     args = append( append( new_exp_list(sql->sa),
+//             exp_atom_str(sql->sa, t->s?t->s->base.name:NULL, &tpe)),
+//             exp_atom_str(sql->sa, t->base.name, &tpe));
+
+       import = exp_op(sql->sa, args, f);
+       if (!import) {
+               return NULL;
+       }
+
+       exps = new_exp_list(sql->sa);
+       for (n = t->columns.set->h; n; n = n->next) {
+               sql_column *c = n->data;
+               append(exps, exp_column(sql->sa, t->base.name, c->base.name, 
&c->type, CARD_MULTI, c->null, 0));
+       }
+       res = rel_table_func(sql->sa, NULL, import, exps, 1);
+       res = rel_insert_table(sql, t, t->base.name, res);
+       return res;
+}
+
 static sql_rel *
 rel_output(mvc *sql, sql_rel *l, sql_exp *sep, sql_exp *rsep, sql_exp *ssep, 
sql_exp *null_string, sql_exp *file) 
 {
@@ -1614,6 +1695,14 @@ rel_updates(mvc *sql, symbol *s)
                sql->type = Q_UPDATE;
        }
                break;
+       case SQL_COPYLOADER:
+       {
+               dlist *l = s->data.lval;
+
+               ret = copyfromloader(sql, l->h->data.lval, 
l->h->next->data.sym);
+               sql->type = Q_UPDATE;
+       }
+               break;
        case SQL_COPYTO:
        {
                dlist *l = s->data.lval;
diff --git a/sql/server/sql_mvc.h b/sql/server/sql_mvc.h
--- a/sql/server/sql_mvc.h
+++ b/sql/server/sql_mvc.h
@@ -37,6 +37,8 @@
 #define card_column    2
 #define card_set       3 /* some operators require only a set (IN/EXISTS) */
 #define card_relation  4
+
+
 /* allowed to reduce (in the where and having parts we can reduce) */
 
 /* different query execution modes (emode) */
diff --git a/sql/server/sql_parser.h b/sql/server/sql_parser.h
--- a/sql/server/sql_parser.h
+++ b/sql/server/sql_parser.h
@@ -148,6 +148,7 @@ typedef enum tokens {
        SQL_ESCAPE,
        SQL_COPYFROM,
        SQL_BINCOPYFROM,
+       SQL_COPYLOADER,
        SQL_COPYTO,
        SQL_EXPORT,
        SQL_NEXT,
diff --git a/sql/server/sql_parser.y b/sql/server/sql_parser.y
--- a/sql/server/sql_parser.y
+++ b/sql/server/sql_parser.y
@@ -583,7 +583,7 @@ SQLCODE SQLERROR UNDER WHENEVER
 %token TEMP TEMPORARY STREAM MERGE REMOTE REPLICA
 %token<sval> ASC DESC AUTHORIZATION
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to