Changeset: e96193b206f6 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e96193b206f6 Added Files: sql/backends/monet5/Tests/cfunction01.stable.err sql/backends/monet5/Tests/cfunction01.stable.out Modified Files: monetdb5/optimizer/opt_cquery.c sql/backends/monet5/Tests/cfunction01.sql sql/backends/monet5/sql_basket.c sql/backends/monet5/sql_gencode.c Branch: trails Log Message:
Add basket unlock and basket tumble instructions before return and yield barriers. diffs (truncated from 466 to 300 lines): diff --git a/monetdb5/optimizer/opt_cquery.c b/monetdb5/optimizer/opt_cquery.c --- a/monetdb5/optimizer/opt_cquery.c +++ b/monetdb5/optimizer/opt_cquery.c @@ -45,14 +45,12 @@ OPTcqueryImplementation(Client cntxt, Ma { int i, j, k, fnd, limit, slimit, extra_stmts = 0; InstrPtr r, p, *old; - int *alias; + int *alias = NULL; str schemas[MAXBSKTOPT]; str tables[MAXBSKTOPT]; int input[MAXBSKTOPT]= {0}; int output[MAXBSKTOPT]= {0}; - int btop=0, lastmvc=0; - int noerror=0; - int mvcseen = 0; + int btop=0, lastmvc=0, lastrealmvc=0, manymvc=0, retseen = 0, noerror=0, mvcseen = 0; int cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0; char buf[256]; lng usec = GDKusec(); @@ -138,17 +136,6 @@ OPTcqueryImplementation(Client cntxt, Ma lastmvc = getArg(p,0); if( getModuleId(p)== cqueryRef && getFunctionId(p) == tumbleRef ) lastmvc = getArg(p,1); - /*if( getModuleId(p) == sqlRef && getFunctionId(p)== mvcRef ){ - if( mvcseen){ - extra_stmts++; - } - mvcseen=1; - extra_stmts += 2; - } - if( getModuleId(p)== cqueryRef && getFunctionId(p)==errorRef ) - noerror++; - if( p->token == ENDsymbol && btop > 0 && noerror==0 ) - extra_stmts += 2;*/ } #ifdef DEBUG_OPT_CQUERY mnstr_printf(cntxt->fdout, "#cquery optimizer started with %d streams, mvc %d\n", btop,lastmvc); @@ -162,12 +149,12 @@ OPTcqueryImplementation(Client cntxt, Ma if (alias == 0) return MAL_SUCCEED; - if (newMalBlkStmt(mb, slimit + extra_stmts) < 0) + if (newMalBlkStmt(mb, slimit + extra_stmts) < 0) { + GDKfree(alias); return MAL_SUCCEED; + } pushInstruction(mb, old[0]); - /*mvcseen = 0; - noerror = 0;*/ for (i = 1; i < limit; i++) if (old[i]) { p = old[i]; @@ -178,18 +165,8 @@ OPTcqueryImplementation(Client cntxt, Ma } if(getModuleId(p) == sqlRef && getFunctionId(p)== mvcRef){ pushInstruction(mb,p); - lastmvc = getArg(p,0); - // watch out for second transaction in same block - if( mvcseen){ - // unlock the tables - for( j=btop-1; j>= 0; j--){ - r= newStmt(mb,basketRef,unlockRef); - r= pushArgument(mb,r,lastmvc); - r= pushStr(mb,r, schemas[j]); - r= pushStr(mb,r, tables[j]); - lastmvc= getArg(r,0); - } - } + lastmvc = lastrealmvc = getArg(p,0); + manymvc++; // register and lock all baskets used for( j=0; j<btop; j++){ p= newStmt(mb,basketRef,registerRef); @@ -199,7 +176,7 @@ OPTcqueryImplementation(Client cntxt, Ma p= pushInt(mb,p, output[j]); alias[lastmvc] = getArg(p,0); lastmvc = getArg(p,0); - + p= newStmt(mb,basketRef,lockRef); p= pushArgument(mb,p,lastmvc); p= pushStr(mb,p, schemas[j]); @@ -240,59 +217,65 @@ OPTcqueryImplementation(Client cntxt, Ma if( getModuleId(p)== cqueryRef && getFunctionId(p)==errorRef) noerror++; - if (p->token == ENDsymbol && btop > 0 && noerror==0) { - // empty all baskets used only when we are optimizing a cq - for(j = 0; j < btop; j++) - if( input[j] && !output[j] ){ - r = newStmt(mb, basketRef, tumbleRef); - r = pushArgument(mb,r, lastmvc); - r = pushStr(mb,r, schemas[j]); - r = pushStr(mb,r, tables[j]); - lastmvc = getArg(r,0); + if ((p->barrier == YIELDsymbol || p->barrier == RETURNsymbol || p->token == ENDsymbol) && btop > 0) { + + if(p->barrier == YIELDsymbol || p->barrier == RETURNsymbol) + retseen = 1; + + if(p->token != ENDsymbol || !retseen) { + // watch out for second transaction in same block + if( mvcseen){ + // unlock the tables + for( j=btop-1; j>= 0; j--){ + r= newStmt(mb,basketRef,unlockRef); + r= pushArgument(mb,r,lastmvc); + r= pushStr(mb,r, schemas[j]); + r= pushStr(mb,r, tables[j]); + lastmvc= getArg(r,0); + } + } + // empty all baskets used only when we are optimizing a cq + for(j = 0; j < btop; j++) + if( input[j] && !output[j] ){ + r = newStmt(mb, basketRef, tumbleRef); + r = pushArgument(mb,r, lastmvc); + r = pushStr(mb,r, schemas[j]); + r = pushStr(mb,r, tables[j]); + lastmvc = getArg(r,0); + } } - /* catch any exception left behind */ - r = newAssignment(mb); - j = getArg(r, 0) = newVariable(mb, "SQLexception", 12, TYPE_str); - setVarUDFtype(mb, j); - r->barrier = CATCHsymbol; + if (p->token == ENDsymbol && noerror==0) { + /* catch any exception left behind */ + r = newAssignment(mb); + j = getArg(r, 0) = newVariable(mb, "SQLexception", 12, TYPE_str); + setVarUDFtype(mb, j); + r->barrier = CATCHsymbol; - r = newStmt(mb,basketRef, errorRef); - r = pushStr(mb, r, getModuleId(old[0])); - r = pushStr(mb, r, getFunctionId(old[0])); - r = pushArgument(mb, r, j); - - r = newAssignment(mb); - getArg(r, 0) = j; - r->barrier = EXITsymbol; - r = newAssignment(mb); - j = getArg(r, 0) = newVariable(mb, "MALexception",12, TYPE_str); - setVarUDFtype(mb, j); - r->barrier = CATCHsymbol; + r = newStmt(mb,basketRef, errorRef); + r = pushStr(mb, r, getModuleId(old[0])); + r = pushStr(mb, r, getFunctionId(old[0])); + r = pushArgument(mb, r, j); - r = newStmt(mb,basketRef, errorRef); - r = pushStr(mb, r, getModuleId(old[0])); - r = pushStr(mb, r, getFunctionId(old[0])); - r = pushArgument(mb, r, j); - - r = newAssignment(mb); - getArg(r, 0) = j; - r->barrier = EXITsymbol; + r = newAssignment(mb); + getArg(r, 0) = j; + r->barrier = EXITsymbol; + r = newAssignment(mb); + j = getArg(r, 0) = newVariable(mb, "MALexception",12, TYPE_str); + setVarUDFtype(mb, j); + r->barrier = CATCHsymbol; - /* non-contiguous queries call for releasing the lock on the basket */ - for( j=btop-1; j>= 0; j--){ - r= newStmt(mb,basketRef,unlockRef); - r= pushArgument(mb,r,lastmvc); - r= pushStr(mb,r, schemas[j]); - r= pushStr(mb,r, tables[j]); - lastmvc= getArg(r,0); + r = newStmt(mb,basketRef, errorRef); + r = pushStr(mb, r, getModuleId(old[0])); + r = pushStr(mb, r, getFunctionId(old[0])); + r = pushArgument(mb, r, j); + + r = newAssignment(mb); + getArg(r, 0) = j; + r->barrier = EXITsymbol; + + break; } - //p= newStmt(mb,basketRef,commitRef); - //p= pushArgument(mb,p, lastmvc); - //p= pushStr(mb,p, schemas[j]); - //p= pushStr(mb,p, tables[j]); - //lastmvc = getArg(p,0); - break; } for (j = 0; j < p->argc; j++) @@ -310,6 +293,24 @@ OPTcqueryImplementation(Client cntxt, Ma pushInstruction(mb, p); } + // a basket lock is performed for each sql.mvc, but we need to remove the last one if there are more than 1 sql.mvc + // in the generated MAL plan + if(manymvc > 1) { + for (j = mb->stop - 1; j >= 0; j--) { + p = getInstrPtr(mb, j); + if(getModuleId(p) == sqlRef && getFunctionId(p) == mvcRef) { + break; + } else if(getModuleId(p) == basketRef && getFunctionId(p) == unlockRef) { + getArg(p,1) = lastrealmvc; + } else if(getModuleId(p) == basketRef && + (getFunctionId(p) == registerRef || getFunctionId(p) == lockRef || getFunctionId(p) == tumbleRef)) { + removeInstruction(mb, p); + freeInstruction(p); + mb->stmt[mb->stop] = NULL; + } + } + } + /* take the remainder as is */ for (; i<limit; i++) if (old[i]) diff --git a/sql/backends/monet5/Tests/cfunction01.sql b/sql/backends/monet5/Tests/cfunction01.sql --- a/sql/backends/monet5/Tests/cfunction01.sql +++ b/sql/backends/monet5/Tests/cfunction01.sql @@ -16,7 +16,7 @@ END; select * from functions where name ='aggr01'; -- a continuous function can be called used like any other function? -select aggr01(); -- causes error +select aggr01(); #should return 0 start continuous function aggr01(); call cquery.wait(1000); #wait to be started @@ -35,6 +35,7 @@ insert into ftmp values(2),(2); insert into ftmp values(3),(3); resume continuous aggr01; +call cquery.wait(1000); select aggr01(); #should return 6 call cquery.wait(1000); diff --git a/sql/backends/monet5/Tests/cfunction01.stable.err b/sql/backends/monet5/Tests/cfunction01.stable.err new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/Tests/cfunction01.stable.err @@ -0,0 +1,34 @@ +stderr of test 'cfunction01` in directory 'sql/backends/monet5` itself: + + +# 16:27:02 > +# 16:27:02 > "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" "mapi_open=true" "--set" "mapi_port=36085" "--set" "mapi_usock=/var/tmp/mtest-2361/.s.monetdb.36085" "--set" "monet_prompt=" "--forcemito" "--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5" +# 16:27:02 > + +# builtin opt gdk_dbpath = /home/ferreira/MonetDB-trails/BUILD/var/monetdb5/dbfarm/demo +# builtin opt gdk_debug = 0 +# builtin opt gdk_vmtrim = no +# builtin opt monet_prompt = > +# builtin opt monet_daemon = no +# builtin opt mapi_port = 50000 +# builtin opt mapi_open = false +# builtin opt mapi_autosense = false +# builtin opt sql_optimizer = default_pipe +# builtin opt sql_debug = 0 +# cmdline opt gdk_nr_threads = 0 +# cmdline opt mapi_open = true +# cmdline opt mapi_port = 36085 +# cmdline opt mapi_usock = /var/tmp/mtest-2361/.s.monetdb.36085 +# cmdline opt monet_prompt = +# cmdline opt gdk_dbpath = /home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5 +# cmdline opt gdk_debug = 536870922 + +# 16:27:02 > +# 16:27:02 > "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" "--host=/var/tmp/mtest-2361" "--port=36085" +# 16:27:02 > + + +# 16:27:07 > +# 16:27:07 > "Done." +# 16:27:07 > + diff --git a/sql/backends/monet5/Tests/cfunction01.stable.out b/sql/backends/monet5/Tests/cfunction01.stable.out new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/Tests/cfunction01.stable.out @@ -0,0 +1,136 @@ +stdout of test 'cfunction01` in directory 'sql/backends/monet5` itself: + + +# 16:27:02 > +# 16:27:02 > "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" "mapi_open=true" "--set" "mapi_port=36085" "--set" "mapi_usock=/var/tmp/mtest-2361/.s.monetdb.36085" "--set" "monet_prompt=" "--forcemito" "--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5" +# 16:27:02 > + +# MonetDB 5 server v11.28.0 +# This is an unreleased version +# Serving database 'mTests_sql_backends_monet5', using 8 threads +# Compiled for x86_64-pc-linux-gnu/64bit with 128bit integers +# Found 15.498 GiB available main-memory. +# Copyright (c) 1993-July 2008 CWI. +# Copyright (c) August 2008-2017 MonetDB B.V., all rights reserved _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list