Changeset: 6bd0c0fd359a for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6bd0c0fd359a Modified Files: sql/backends/monet5/sql_cat.c sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_cquery.h sql/backends/monet5/sqlcatalog.mal sql/include/sql_catalog.h sql/server/rel_psm.c sql/server/rel_semantic.c sql/server/sql_mvc.c sql/server/sql_mvc.h sql/server/sql_parser.h sql/server/sql_parser.y sql/server/sql_scan.c sql/storage/sql_storage.h sql/storage/store.c Branch: timetrails Log Message:
Added 'START CONTINUOUS PROCEDURE qname', 'INTERRUPT CONTINUOUS PROCEDURE qname' and 'HALT CONTINUOUS PROCEDURE qname' SQL statements. For now we use INTERRUPT and HALT instead of PAUSE and STOP respectively, because these words are used in the system functions sys.pause and sys.stop diffs (truncated from 912 to 300 lines): diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c --- a/sql/backends/monet5/sql_cat.c +++ b/sql/backends/monet5/sql_cat.c @@ -24,6 +24,7 @@ #include "opt_prelude.h" #include "querylog.h" #include "mal_builder.h" +#include "mal_client.h" #include "mal_debugger.h" #include <rel_select.h> @@ -48,7 +49,7 @@ static char * SaveArgReference(MalStkPtr stk, InstrPtr pci, int arg) { char *val = *getArgReference_str(stk, pci, arg); - + if (val && strcmp(val, str_nil) == 0) val = NULL; return val; @@ -712,36 +713,58 @@ continuous_procedure(mvc *sql, char *sna { sql_schema *s = NULL; char *F; + Client cntxt; + str petrinetResponse = MAL_SUCCEED; switch (action) { case START_CONTINUOUS_PROCEDURE: F = "START CONTINUOUS PROCEDURE"; break; - case PAUSE_CONTINUOUS_PROCEDURE: - F = "PAUSE CONTINUOUS PROCEDURE"; + case INTERRUPT_CONTINUOUS_PROCEDURE: + F = "INTERRUPT CONTINUOUS PROCEDURE"; break; - case STOP_CONTINUOUS_PROCEDURE: - F = "STOP CONTINUOUS PROCEDURE"; + case HALT_CONTINUOUS_PROCEDURE: + F = "HALT CONTINUOUS PROCEDURE"; break; } if (sname && !(s = mvc_bind_schema(sql, sname))) - return sql_message("3F000!%s CONTINUOUS PROCEDURE: no such schema '%s'", F, sname); + return sql_message("3F000!%s: no such schema '%s'", F, sname); if (!s) s = cur_schema(sql); if (fid >= 0) { node *n = find_sql_func_node(s, fid); if (n) { sql_func *func = n->data; - if (!mvc_schema_privs(sql, s)) { - return sql_message("%s: access denied for %s to schema ;'%s'", F, stack_get_string(sql, "current_user"), s->base.name); + return sql_message("3F000!%s: access denied for %s to schema ;'%s'", F, stack_get_string(sql, "current_user"), s->base.name); } - - mvc_continuous_procedure(sql, s, func, action); + switch (action) { + case START_CONTINUOUS_PROCEDURE: { + if(!CQlocate(sname, cpname)) { //if the continuous procedure is not registered in the catalog then we register it + cntxt = MCgetClient(sql->clientid); + petrinetResponse = CQregisterInternal(cntxt, sname, cpname); + } + if(!petrinetResponse) { + petrinetResponse = CQresumeInternal(sname, cpname); + } + } + break; + case INTERRUPT_CONTINUOUS_PROCEDURE: + petrinetResponse = CQpauseInternal(sname, cpname); + break; + case HALT_CONTINUOUS_PROCEDURE: + petrinetResponse = CQderegisterInternal(sname, cpname); + break; + } + if(petrinetResponse) { + return sql_message("3F000!%s: internal error: %s", F, petrinetResponse); + } else { + mvc_continuous_procedure(sql, s, func); + } } } else { - return sql_message("3F000!%s CONTINUOUS PROCEDURE: could not find continuous procedure %s in the catalog", F, cpname); + return sql_message("3F000!%s: could not find continuous procedure %s in the catalog", F, cpname); } return MAL_SUCCEED; } @@ -810,7 +833,7 @@ UPGcreate_view(Client cntxt, MalBlkPtr m return msg; if ((msg = checkSQLContext(cntxt)) != NULL) return msg; - + osname = cur_schema(sql)->base.name; mvc_set_schema(sql, sname); s = sql_parse(be, sa_create(), view, 0); diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -246,7 +246,7 @@ wrapup: throw(SQL,"cquery.status",MAL_MALLOC_FAIL); } -static int +int CQlocate(str modname, str fcnname) { int i; @@ -272,7 +272,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS idx = CQlocate(sch, fcn); if( idx == pnettop) - throw(SQL,"cquery.error","Continous query %s.%s not accessible\n",sch,fcn); + throw(SQL,"cquery.error","Continuous procedure %s.%s not accessible\n",sch,fcn); pnet[idx].error = GDKstrdup(error); return MAL_SUCCEED; @@ -291,7 +291,7 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt idx = CQlocate(sch, fcn); if( idx == pnettop) - throw(SQL,"cquery.show","Continous query %s.%s not accessible\n",sch,fcn); + throw(SQL,"cquery.show","Continuous procedure %s.%s not accessible\n",sch,fcn); printFunction(cntxt->fdout, pnet[idx].mb, 0, LIST_MAL_NAME | LIST_MAL_VALUE | LIST_MAL_MAPI); return MAL_SUCCEED; @@ -387,68 +387,61 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i static str IOTprocedureStmt(Client cntxt, MalBlkPtr mb, str schema, str nme) { - mvc *m = NULL; - str msg = MAL_SUCCEED; - sql_schema *s; - backend *be; - node *o; - sql_func *f; - /*sql_trans *tr;*/ + mvc *m = NULL; + str msg = MAL_SUCCEED; + sql_schema *s; + backend *be; + node *o; + sql_func *f; + /*sql_trans *tr;*/ - msg = getSQLContext(cntxt, mb, &m, NULL); - if ((msg = checkSQLContext(cntxt)) != MAL_SUCCEED) - return msg; - s = mvc_bind_schema(m, schema); - if (s == NULL) - throw(SQL, "cquery.register", "Schema missing"); - /*tr = m->session->tr;*/ - for (o = s->funcs.set->h; o; o = o->next) { - f = o->data; - if (strcmp(f->base.name, nme) == 0) { - be = (void *) backend_create(m, cntxt); - if ( be->mvc->sa == NULL) - be->mvc->sa = sa_create(); - //TODO fix result type - backend_create_func(be, f, f->res,NULL); - return MAL_SUCCEED; - } - } - throw(SQL, "cquery.register", "SQL procedure missing"); + msg = getSQLContext(cntxt, mb, &m, NULL); + if ((msg = checkSQLContext(cntxt)) != MAL_SUCCEED) + return msg; + s = mvc_bind_schema(m, schema); + if (s == NULL) + throw(SQL, "cquery.register", "Schema missing"); + /*tr = m->session->tr;*/ + for (o = s->funcs.set->h; o; o = o->next) { + f = o->data; + if (strcmp(f->base.name, nme) == 0) { + be = (void *) backend_create(m, cntxt); + if ( be->mvc->sa == NULL) + be->mvc->sa = sa_create(); + //TODO fix result type + backend_create_func(be, f, f->res,NULL); + return MAL_SUCCEED; + } + } + throw(SQL, "cquery.register", "SQL procedure missing"); } -/*str -CQregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +str +CQregisterInternal(Client cntxt, str modnme, str fcnnme) { int i; InstrPtr sig,q; str msg = MAL_SUCCEED; - MalBlkPtr nmb; + MalBlkPtr mb, nmb; + Module scope; Symbol s = NULL; - Module scope; char buf[IDLENGTH]; - str modnme = *getArgReference_str(stk, pci, 1); - str fcnnme = *getArgReference_str(stk, pci, 2); - - msg = IOTprocedureStmt(cntxt, mb, modnme, fcnnme); - if( msg) - return msg; - scope = findModule(cntxt->nspace, putName(modnme)); if (scope) s = findSymbolInModule(scope, putName(fcnnme)); if (s == NULL) - throw(MAL, "cquery.register", "Could not find SQL procedure\n"); + throw(MAL, "cquery.register", "Could not find SQL procedure"); - if (pnettop == MAXCQ) + if (pnettop == MAXCQ) GDKerror("cquery.register:Too many transitions"); mb = s->def; sig = getInstrPtr(mb,0); i = CQlocate(getModuleId(sig), getFunctionId(sig)); if (i != pnettop) - throw(MAL,"cquery.register","Duplicate registration of cquery"); + throw(MAL,"cquery.register","Duplicate registration of continuous procedure"); #ifdef DEBUG_CQUERY fprintf(stderr, "#cquery register %s.%s\n", getModuleId(sig),getFunctionId(sig)); @@ -460,9 +453,9 @@ CQregister(Client cntxt, MalBlkPtr mb, M s = newFunction(userRef, putName(buf), FUNCTIONsymbol); nmb = s->def; setArgType(nmb, nmb->stmt[0],0, TYPE_void); - (void) newStmt(nmb, sqlRef, transactionRef); + (void) newStmt(nmb, sqlRef, transactionRef); (void) newStmt(nmb, getModuleId(sig),getFunctionId(sig)); - q = newStmt(nmb, sqlRef, commitRef); + q = newStmt(nmb, sqlRef, commitRef); setArgType(nmb,q, 0, TYPE_void); pushEndInstruction(nmb); chkProgram(cntxt->fdout, cntxt->nspace, nmb); @@ -473,14 +466,14 @@ CQregister(Client cntxt, MalBlkPtr mb, M MT_lock_set(&ttrLock); if( CQlocate(getModuleId(sig), getFunctionId(sig)) != pnettop){ freeSymbol(s); - throw(MAL,"cquery.register","Duplicate registration of cquery"); + throw(MAL,"cquery.register","Duplicate registration of continuous procedure"); } pnet[pnettop].mod = GDKstrdup(modnme); pnet[pnettop].fcn = GDKstrdup(fcnnme); pnet[pnettop].mb = nmb; pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize); - pnet[pnettop].cycles = int_nil; + pnet[pnettop].cycles = int_nil; pnet[pnettop].beats = lng_nil; pnet[pnettop].run = lng_nil; pnet[pnettop].seen = *timestamp_nil; @@ -495,119 +488,33 @@ CQregister(Client cntxt, MalBlkPtr mb, M if( pnettop == 0) pnstatus = CQSTOP; return msg; -}*/ - -str -CQregisterInternal(Client cntxt, str modnme, str fcnnme) -{ - int i; - InstrPtr sig,q; - str msg = MAL_SUCCEED; - MalBlkPtr mb, nmb; - Module scope; - Symbol s = NULL; - char buf[IDLENGTH]; - - scope = findModule(cntxt->nspace, putName(modnme)); - if (scope) - s = findSymbolInModule(scope, putName(fcnnme)); - - if (s == NULL) - throw(MAL, "cquery.register", "Could not find SQL procedure"); - - if (pnettop == MAXCQ) - GDKerror("cquery.register:Too many transitions"); - - mb = s->def; - sig = getInstrPtr(mb,0); - i = CQlocate(getModuleId(sig), getFunctionId(sig)); - if (i != pnettop) - throw(MAL,"cquery.register","Duplicate registration of cquery"); - -#ifdef DEBUG_CQUERY - fprintf(stderr, "#cquery register %s.%s\n", getModuleId(sig),getFunctionId(sig)); - fprintFunction(stderr,mb,0,LIST_MAL_ALL); -#endif - memset((void*) (pnet+pnettop), 0, sizeof(CQnode)); - - snprintf(buf,IDLENGTH,"%s_%s",modnme,fcnnme); - s = newFunction(userRef, putName(buf), FUNCTIONsymbol); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list