Changeset: 6bd0c0fd359a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6bd0c0fd359a
Modified Files:
        sql/backends/monet5/sql_cat.c
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/backends/monet5/sqlcatalog.mal
        sql/include/sql_catalog.h
        sql/server/rel_psm.c
        sql/server/rel_semantic.c
        sql/server/sql_mvc.c
        sql/server/sql_mvc.h
        sql/server/sql_parser.h
        sql/server/sql_parser.y
        sql/server/sql_scan.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: timetrails
Log Message:

Added 'START CONTINUOUS PROCEDURE qname', 'INTERRUPT CONTINUOUS PROCEDURE 
qname' and 'HALT CONTINUOUS PROCEDURE qname' SQL statements. For now we use 
INTERRUPT and HALT instead of PAUSE and STOP respectively, because these words 
are used in the system functions sys.pause and sys.stop


diffs (truncated from 912 to 300 lines):

diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -24,6 +24,7 @@
 #include "opt_prelude.h"
 #include "querylog.h"
 #include "mal_builder.h"
+#include "mal_client.h"
 #include "mal_debugger.h"
 
 #include <rel_select.h>
@@ -48,7 +49,7 @@ static char *
 SaveArgReference(MalStkPtr stk, InstrPtr pci, int arg)
 {   
     char *val = *getArgReference_str(stk, pci, arg);
-    
+
     if (val && strcmp(val, str_nil) == 0)
         val = NULL;
     return val;
@@ -712,36 +713,58 @@ continuous_procedure(mvc *sql, char *sna
 {
        sql_schema *s = NULL;
        char *F;
+       Client cntxt;
+       str petrinetResponse = MAL_SUCCEED;
 
        switch (action) {
                case START_CONTINUOUS_PROCEDURE:
                        F = "START CONTINUOUS PROCEDURE";
                        break;
-               case PAUSE_CONTINUOUS_PROCEDURE:
-                       F = "PAUSE CONTINUOUS PROCEDURE";
+               case INTERRUPT_CONTINUOUS_PROCEDURE:
+                       F = "INTERRUPT CONTINUOUS PROCEDURE";
                        break;
-               case STOP_CONTINUOUS_PROCEDURE:
-                       F = "STOP CONTINUOUS PROCEDURE";
+               case HALT_CONTINUOUS_PROCEDURE:
+                       F = "HALT CONTINUOUS PROCEDURE";
                        break;
        }
 
        if (sname && !(s = mvc_bind_schema(sql, sname)))
-               return sql_message("3F000!%s CONTINUOUS PROCEDURE: no such 
schema '%s'", F, sname);
+               return sql_message("3F000!%s: no such schema '%s'", F, sname);
        if (!s)
                s = cur_schema(sql);
        if (fid >= 0) {
                node *n = find_sql_func_node(s, fid);
                if (n) {
                        sql_func *func = n->data;
-
                        if (!mvc_schema_privs(sql, s)) {
-                               return sql_message("%s: access denied for %s to 
schema ;'%s'", F, stack_get_string(sql, "current_user"), s->base.name);
+                               return sql_message("3F000!%s: access denied for 
%s to schema ;'%s'", F, stack_get_string(sql, "current_user"), s->base.name);
                        }
-
-                       mvc_continuous_procedure(sql, s, func, action);
+                       switch (action) {
+                               case START_CONTINUOUS_PROCEDURE: {
+                                               if(!CQlocate(sname, cpname)) { 
//if the continuous procedure is not registered in the catalog then we register 
it
+                                                       cntxt = 
MCgetClient(sql->clientid);
+                                                       petrinetResponse = 
CQregisterInternal(cntxt, sname, cpname);
+                                               }
+                                               if(!petrinetResponse) {
+                                                       petrinetResponse = 
CQresumeInternal(sname, cpname);
+                                               }
+                                       }
+                                       break;
+                               case INTERRUPT_CONTINUOUS_PROCEDURE:
+                                       petrinetResponse = 
CQpauseInternal(sname, cpname);
+                                       break;
+                               case HALT_CONTINUOUS_PROCEDURE:
+                                       petrinetResponse = 
CQderegisterInternal(sname, cpname);
+                                       break;
+                       }
+                       if(petrinetResponse) {
+                               return sql_message("3F000!%s: internal error: 
%s", F, petrinetResponse);
+                       } else {
+                               mvc_continuous_procedure(sql, s, func);
+                       }
                }
        } else {
-               return sql_message("3F000!%s CONTINUOUS PROCEDURE: could not 
find continuous procedure %s in the catalog", F, cpname);
+               return sql_message("3F000!%s: could not find continuous 
procedure %s in the catalog", F, cpname);
        }
        return MAL_SUCCEED;
 }
@@ -810,7 +833,7 @@ UPGcreate_view(Client cntxt, MalBlkPtr m
                return msg;
        if ((msg = checkSQLContext(cntxt)) != NULL)
                return msg;
-                                                              
+
        osname = cur_schema(sql)->base.name;
        mvc_set_schema(sql, sname);
        s = sql_parse(be, sa_create(), view, 0);
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -246,7 +246,7 @@ wrapup:
        throw(SQL,"cquery.status",MAL_MALLOC_FAIL);
 }
 
-static int
+int
 CQlocate(str modname, str fcnname)
 {
        int i;
@@ -272,7 +272,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS
 
        idx = CQlocate(sch, fcn);
        if( idx == pnettop)
-               throw(SQL,"cquery.error","Continous query %s.%s not 
accessible\n",sch,fcn);
+               throw(SQL,"cquery.error","Continuous procedure %s.%s not 
accessible\n",sch,fcn);
 
        pnet[idx].error = GDKstrdup(error);
        return MAL_SUCCEED;
@@ -291,7 +291,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt
 
        idx = CQlocate(sch, fcn);
        if( idx == pnettop)
-               throw(SQL,"cquery.show","Continous query %s.%s not 
accessible\n",sch,fcn);
+               throw(SQL,"cquery.show","Continuous procedure %s.%s not 
accessible\n",sch,fcn);
 
        printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME | 
LIST_MAL_VALUE  | LIST_MAL_MAPI);
        return MAL_SUCCEED;
@@ -387,68 +387,61 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
 static str
 IOTprocedureStmt(Client cntxt, MalBlkPtr mb, str schema, str nme)
 {
-    mvc *m = NULL;
-    str msg = MAL_SUCCEED;
-    sql_schema  *s;
-    backend *be;
-    node *o;
-    sql_func *f;
-    /*sql_trans *tr;*/
+       mvc *m = NULL;
+       str msg = MAL_SUCCEED;
+       sql_schema  *s;
+       backend *be;
+       node *o;
+       sql_func *f;
+       /*sql_trans *tr;*/
 
-    msg = getSQLContext(cntxt, mb, &m, NULL);
-    if ((msg = checkSQLContext(cntxt)) != MAL_SUCCEED)
-        return msg;
-    s = mvc_bind_schema(m, schema);
-    if (s == NULL)
-        throw(SQL, "cquery.register", "Schema missing");
-    /*tr = m->session->tr;*/
-    for (o = s->funcs.set->h; o; o = o->next) {
-        f = o->data;
-        if (strcmp(f->base.name, nme) == 0) {
-            be = (void *) backend_create(m, cntxt);
-            if ( be->mvc->sa == NULL)
-                be->mvc->sa = sa_create();
-            //TODO fix result type
-            backend_create_func(be, f, f->res,NULL);
-            return MAL_SUCCEED;
-        }
-    }
-    throw(SQL, "cquery.register", "SQL procedure missing");
+       msg = getSQLContext(cntxt, mb, &m, NULL);
+       if ((msg = checkSQLContext(cntxt)) != MAL_SUCCEED)
+               return msg;
+       s = mvc_bind_schema(m, schema);
+       if (s == NULL)
+               throw(SQL, "cquery.register", "Schema missing");
+       /*tr = m->session->tr;*/
+       for (o = s->funcs.set->h; o; o = o->next) {
+               f = o->data;
+               if (strcmp(f->base.name, nme) == 0) {
+                       be = (void *) backend_create(m, cntxt);
+                       if ( be->mvc->sa == NULL)
+                               be->mvc->sa = sa_create();
+                       //TODO fix result type
+                       backend_create_func(be, f, f->res,NULL);
+                       return MAL_SUCCEED;
+               }
+       }
+       throw(SQL, "cquery.register", "SQL procedure missing");
 }
 
-/*str
-CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+str
+CQregisterInternal(Client cntxt, str modnme, str fcnnme)
 {
        int i;
        InstrPtr sig,q;
        str msg = MAL_SUCCEED;
-       MalBlkPtr nmb;
+       MalBlkPtr mb, nmb;
+       Module scope;
        Symbol s = NULL;
-       Module scope;
        char buf[IDLENGTH];
 
-       str modnme = *getArgReference_str(stk, pci, 1);
-       str fcnnme = *getArgReference_str(stk, pci, 2);
-
-    msg = IOTprocedureStmt(cntxt, mb, modnme, fcnnme);
-       if( msg)
-               return msg;
-
        scope = findModule(cntxt->nspace, putName(modnme));
        if (scope)
                s = findSymbolInModule(scope, putName(fcnnme));
 
        if (s == NULL)
-               throw(MAL, "cquery.register", "Could not find SQL procedure\n");
+               throw(MAL, "cquery.register", "Could not find SQL procedure");
 
-       if (pnettop == MAXCQ) 
+       if (pnettop == MAXCQ)
                GDKerror("cquery.register:Too many transitions");
 
        mb = s->def;
        sig = getInstrPtr(mb,0);
        i = CQlocate(getModuleId(sig), getFunctionId(sig));
        if (i != pnettop)
-               throw(MAL,"cquery.register","Duplicate registration of cquery");
+               throw(MAL,"cquery.register","Duplicate registration of 
continuous procedure");
 
 #ifdef DEBUG_CQUERY
        fprintf(stderr, "#cquery register %s.%s\n", 
getModuleId(sig),getFunctionId(sig));
@@ -460,9 +453,9 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        s = newFunction(userRef, putName(buf), FUNCTIONsymbol);
        nmb = s->def;
        setArgType(nmb, nmb->stmt[0],0, TYPE_void);
-    (void) newStmt(nmb, sqlRef, transactionRef);
+       (void) newStmt(nmb, sqlRef, transactionRef);
        (void) newStmt(nmb, getModuleId(sig),getFunctionId(sig));
-    q = newStmt(nmb, sqlRef, commitRef);
+       q = newStmt(nmb, sqlRef, commitRef);
        setArgType(nmb,q, 0, TYPE_void);
        pushEndInstruction(nmb);
        chkProgram(cntxt->fdout, cntxt->nspace, nmb);
@@ -473,14 +466,14 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        MT_lock_set(&ttrLock);
        if( CQlocate(getModuleId(sig), getFunctionId(sig)) != pnettop){
                freeSymbol(s);
-               throw(MAL,"cquery.register","Duplicate registration of cquery");
+               throw(MAL,"cquery.register","Duplicate registration of 
continuous procedure");
        }
        pnet[pnettop].mod = GDKstrdup(modnme);
        pnet[pnettop].fcn = GDKstrdup(fcnnme);
        pnet[pnettop].mb = nmb;
        pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
 
-       pnet[pnettop].cycles = int_nil; 
+       pnet[pnettop].cycles = int_nil;
        pnet[pnettop].beats = lng_nil;
        pnet[pnettop].run  = lng_nil;
        pnet[pnettop].seen = *timestamp_nil;
@@ -495,119 +488,33 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        if( pnettop == 0)
                pnstatus = CQSTOP;
        return msg;
-}*/
-
-str
-CQregisterInternal(Client cntxt, str modnme, str fcnnme)
-{
-    int i;
-    InstrPtr sig,q;
-    str msg = MAL_SUCCEED;
-    MalBlkPtr mb, nmb;
-       Module scope;
-       Symbol s = NULL;
-    char buf[IDLENGTH];
-
-       scope = findModule(cntxt->nspace, putName(modnme));
-       if (scope)
-               s = findSymbolInModule(scope, putName(fcnnme));
-
-       if (s == NULL)
-               throw(MAL, "cquery.register", "Could not find SQL procedure");
-
-    if (pnettop == MAXCQ)
-        GDKerror("cquery.register:Too many transitions");
-
-    mb = s->def;
-    sig = getInstrPtr(mb,0);
-    i = CQlocate(getModuleId(sig), getFunctionId(sig));
-    if (i != pnettop)
-        throw(MAL,"cquery.register","Duplicate registration of cquery");
-
-#ifdef DEBUG_CQUERY
-    fprintf(stderr, "#cquery register %s.%s\n", 
getModuleId(sig),getFunctionId(sig));
-       fprintFunction(stderr,mb,0,LIST_MAL_ALL);
-#endif
-    memset((void*) (pnet+pnettop), 0, sizeof(CQnode));
-
-    snprintf(buf,IDLENGTH,"%s_%s",modnme,fcnnme);
-    s = newFunction(userRef, putName(buf), FUNCTIONsymbol);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to