Changeset: e96193b206f6 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e96193b206f6
Added Files:
        sql/backends/monet5/Tests/cfunction01.stable.err
        sql/backends/monet5/Tests/cfunction01.stable.out
Modified Files:
        monetdb5/optimizer/opt_cquery.c
        sql/backends/monet5/Tests/cfunction01.sql
        sql/backends/monet5/sql_basket.c
        sql/backends/monet5/sql_gencode.c
Branch: trails
Log Message:

Add basket unlock and basket tumble instructions before return and yield 
barriers.


diffs (truncated from 466 to 300 lines):

diff --git a/monetdb5/optimizer/opt_cquery.c b/monetdb5/optimizer/opt_cquery.c
--- a/monetdb5/optimizer/opt_cquery.c
+++ b/monetdb5/optimizer/opt_cquery.c
@@ -45,14 +45,12 @@ OPTcqueryImplementation(Client cntxt, Ma
 {
        int i, j, k, fnd, limit, slimit, extra_stmts = 0;
        InstrPtr r, p, *old;
-       int *alias;
+       int *alias = NULL;
        str schemas[MAXBSKTOPT];
        str tables[MAXBSKTOPT];
        int input[MAXBSKTOPT]= {0};
        int output[MAXBSKTOPT]= {0};
-       int btop=0, lastmvc=0;
-       int noerror=0;
-       int mvcseen = 0;
+       int btop=0, lastmvc=0, lastrealmvc=0, manymvc=0, retseen = 0, 
noerror=0, mvcseen = 0;
        int cq= strncmp(getFunctionId(getInstrPtr(mb,0)),"cq",2) == 0;
        char buf[256];
        lng usec = GDKusec();
@@ -138,17 +136,6 @@ OPTcqueryImplementation(Client cntxt, Ma
                        lastmvc = getArg(p,0);
                if( getModuleId(p)== cqueryRef && getFunctionId(p) == tumbleRef 
)
                        lastmvc = getArg(p,1);
-               /*if( getModuleId(p) == sqlRef && getFunctionId(p)== mvcRef ){
-                       if( mvcseen){
-                               extra_stmts++;
-                       }
-                       mvcseen=1;
-                       extra_stmts += 2;
-               }
-               if( getModuleId(p)== cqueryRef && getFunctionId(p)==errorRef )
-                       noerror++;
-               if( p->token == ENDsymbol && btop > 0 && noerror==0 )
-                       extra_stmts += 2;*/
        }
 #ifdef DEBUG_OPT_CQUERY
        mnstr_printf(cntxt->fdout, "#cquery optimizer started with %d streams, 
mvc %d\n", btop,lastmvc);
@@ -162,12 +149,12 @@ OPTcqueryImplementation(Client cntxt, Ma
        if (alias == 0)
                return MAL_SUCCEED;
 
-       if (newMalBlkStmt(mb, slimit + extra_stmts) < 0)
+       if (newMalBlkStmt(mb, slimit + extra_stmts) < 0) {
+               GDKfree(alias);
                return MAL_SUCCEED;
+       }
 
        pushInstruction(mb, old[0]);
-       /*mvcseen = 0;
-       noerror = 0;*/
        for (i = 1; i < limit; i++)
                if (old[i]) {
                        p = old[i];
@@ -178,18 +165,8 @@ OPTcqueryImplementation(Client cntxt, Ma
                        }
                        if(getModuleId(p) == sqlRef && getFunctionId(p)== 
mvcRef){
                                pushInstruction(mb,p);
-                               lastmvc = getArg(p,0);
-                               // watch out for second transaction in same 
block
-                               if( mvcseen){
-                                       // unlock the tables
-                                       for( j=btop-1; j>= 0; j--){
-                                               r= 
newStmt(mb,basketRef,unlockRef);
-                                               r= pushArgument(mb,r,lastmvc);
-                                               r= pushStr(mb,r, schemas[j]);
-                                               r= pushStr(mb,r, tables[j]);
-                                               lastmvc= getArg(r,0);
-                                       }
-                               }
+                               lastmvc = lastrealmvc = getArg(p,0);
+                               manymvc++;
                                // register and lock all baskets used
                                for( j=0; j<btop; j++){
                                        p= newStmt(mb,basketRef,registerRef);
@@ -199,7 +176,7 @@ OPTcqueryImplementation(Client cntxt, Ma
                                        p= pushInt(mb,p, output[j]);
                                        alias[lastmvc] = getArg(p,0);
                                        lastmvc = getArg(p,0);
-       
+
                                        p= newStmt(mb,basketRef,lockRef);
                                        p= pushArgument(mb,p,lastmvc);
                                        p= pushStr(mb,p, schemas[j]);
@@ -240,59 +217,65 @@ OPTcqueryImplementation(Client cntxt, Ma
 
                        if( getModuleId(p)== cqueryRef && 
getFunctionId(p)==errorRef)
                                noerror++;
-                       if (p->token == ENDsymbol && btop > 0 && noerror==0) {
-                               // empty all baskets used only when we are 
optimizing a cq
-                               for(j = 0; j < btop; j++)
-                               if( input[j] && !output[j] ){
-                                       r =  newStmt(mb, basketRef, tumbleRef);
-                                       r =  pushArgument(mb,r, lastmvc);
-                                       r =  pushStr(mb,r, schemas[j]);
-                                       r =  pushStr(mb,r, tables[j]);
-                                       lastmvc = getArg(r,0);
+                       if ((p->barrier == YIELDsymbol || p->barrier == 
RETURNsymbol || p->token == ENDsymbol) && btop > 0) {
+
+                               if(p->barrier == YIELDsymbol || p->barrier == 
RETURNsymbol)
+                                       retseen = 1;
+
+                               if(p->token != ENDsymbol || !retseen) {
+                                       // watch out for second transaction in 
same block
+                                       if( mvcseen){
+                                               // unlock the tables
+                                               for( j=btop-1; j>= 0; j--){
+                                                       r= 
newStmt(mb,basketRef,unlockRef);
+                                                       r= 
pushArgument(mb,r,lastmvc);
+                                                       r= pushStr(mb,r, 
schemas[j]);
+                                                       r= pushStr(mb,r, 
tables[j]);
+                                                       lastmvc= getArg(r,0);
+                                               }
+                                       }
+                                       // empty all baskets used only when we 
are optimizing a cq
+                                       for(j = 0; j < btop; j++)
+                                               if( input[j] && !output[j] ){
+                                                       r =  newStmt(mb, 
basketRef, tumbleRef);
+                                                       r =  pushArgument(mb,r, 
lastmvc);
+                                                       r =  pushStr(mb,r, 
schemas[j]);
+                                                       r =  pushStr(mb,r, 
tables[j]);
+                                                       lastmvc = getArg(r,0);
+                                       }
                                }
 
-                               /* catch any exception left behind */
-                               r = newAssignment(mb);
-                               j = getArg(r, 0) = newVariable(mb, 
"SQLexception", 12, TYPE_str);
-                               setVarUDFtype(mb, j);
-                               r->barrier = CATCHsymbol;
+                               if (p->token == ENDsymbol && noerror==0) {
+                                       /* catch any exception left behind */
+                                       r = newAssignment(mb);
+                                       j = getArg(r, 0) = newVariable(mb, 
"SQLexception", 12, TYPE_str);
+                                       setVarUDFtype(mb, j);
+                                       r->barrier = CATCHsymbol;
 
-                               r = newStmt(mb,basketRef, errorRef);
-                               r = pushStr(mb, r, getModuleId(old[0]));
-                               r = pushStr(mb, r, getFunctionId(old[0]));
-                               r = pushArgument(mb, r, j);
-
-                               r = newAssignment(mb);
-                               getArg(r, 0) = j;
-                               r->barrier = EXITsymbol;
-                               r = newAssignment(mb);
-                               j = getArg(r, 0) = newVariable(mb, 
"MALexception",12, TYPE_str);
-                               setVarUDFtype(mb, j);
-                               r->barrier = CATCHsymbol;
+                                       r = newStmt(mb,basketRef, errorRef);
+                                       r = pushStr(mb, r, getModuleId(old[0]));
+                                       r = pushStr(mb, r, 
getFunctionId(old[0]));
+                                       r = pushArgument(mb, r, j);
 
-                               r = newStmt(mb,basketRef, errorRef);
-                               r = pushStr(mb, r, getModuleId(old[0]));
-                               r = pushStr(mb, r, getFunctionId(old[0]));
-                               r = pushArgument(mb, r, j);
-
-                               r = newAssignment(mb);
-                               getArg(r, 0) = j;
-                               r->barrier = EXITsymbol;
+                                       r = newAssignment(mb);
+                                       getArg(r, 0) = j;
+                                       r->barrier = EXITsymbol;
+                                       r = newAssignment(mb);
+                                       j = getArg(r, 0) = newVariable(mb, 
"MALexception",12, TYPE_str);
+                                       setVarUDFtype(mb, j);
+                                       r->barrier = CATCHsymbol;
 
-                               /* non-contiguous queries call for releasing 
the lock on the basket */
-                               for( j=btop-1; j>= 0; j--){
-                                       r= newStmt(mb,basketRef,unlockRef);
-                                       r= pushArgument(mb,r,lastmvc);
-                                       r= pushStr(mb,r, schemas[j]);
-                                       r= pushStr(mb,r, tables[j]);
-                                       lastmvc= getArg(r,0);
+                                       r = newStmt(mb,basketRef, errorRef);
+                                       r = pushStr(mb, r, getModuleId(old[0]));
+                                       r = pushStr(mb, r, 
getFunctionId(old[0]));
+                                       r = pushArgument(mb, r, j);
+
+                                       r = newAssignment(mb);
+                                       getArg(r, 0) = j;
+                                       r->barrier = EXITsymbol;
+
+                                       break;
                                }
-                                       //p= newStmt(mb,basketRef,commitRef);
-                                       //p= pushArgument(mb,p, lastmvc);
-                                       //p= pushStr(mb,p, schemas[j]);
-                                       //p= pushStr(mb,p, tables[j]);
-                                       //lastmvc = getArg(p,0);
-                               break;
                        }
 
                        for (j = 0; j < p->argc; j++)
@@ -310,6 +293,24 @@ OPTcqueryImplementation(Client cntxt, Ma
                        pushInstruction(mb, p);
                }
 
+       // a basket lock is performed for each sql.mvc, but we need to remove 
the last one if there are more than 1 sql.mvc
+       // in the generated MAL plan
+       if(manymvc > 1) {
+               for (j = mb->stop - 1; j >= 0; j--) {
+                       p = getInstrPtr(mb, j);
+                       if(getModuleId(p) == sqlRef && getFunctionId(p) == 
mvcRef) {
+                               break;
+                       } else if(getModuleId(p) == basketRef && 
getFunctionId(p) == unlockRef) {
+                               getArg(p,1) = lastrealmvc;
+                       } else if(getModuleId(p) == basketRef &&
+                               (getFunctionId(p) == registerRef || 
getFunctionId(p) == lockRef || getFunctionId(p) == tumbleRef)) {
+                               removeInstruction(mb, p);
+                               freeInstruction(p);
+                               mb->stmt[mb->stop] = NULL;
+                       }
+               }
+       }
+
        /* take the remainder as is */
        for (; i<limit; i++)
                if (old[i])
diff --git a/sql/backends/monet5/Tests/cfunction01.sql 
b/sql/backends/monet5/Tests/cfunction01.sql
--- a/sql/backends/monet5/Tests/cfunction01.sql
+++ b/sql/backends/monet5/Tests/cfunction01.sql
@@ -16,7 +16,7 @@ END;
 select * from functions where name ='aggr01';
 
 -- a continuous function can be called used like any other function?
-select aggr01();  -- causes error
+select aggr01(); #should return 0
 
 start continuous function aggr01();
 call cquery.wait(1000); #wait to be started
@@ -35,6 +35,7 @@ insert into ftmp values(2),(2);
 insert into ftmp values(3),(3);
 
 resume continuous aggr01;
+call cquery.wait(1000);
 select aggr01(); #should return 6
 
 call cquery.wait(1000);
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.err 
b/sql/backends/monet5/Tests/cfunction01.stable.err
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cfunction01.stable.err
@@ -0,0 +1,34 @@
+stderr of test 'cfunction01` in directory 'sql/backends/monet5` itself:
+
+
+# 16:27:02 >  
+# 16:27:02 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=36085" "--set" 
"mapi_usock=/var/tmp/mtest-2361/.s.monetdb.36085" "--set" "monet_prompt=" 
"--forcemito" 
"--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5"
+# 16:27:02 >  
+
+# builtin opt  gdk_dbpath = 
/home/ferreira/MonetDB-trails/BUILD/var/monetdb5/dbfarm/demo
+# builtin opt  gdk_debug = 0
+# builtin opt  gdk_vmtrim = no
+# builtin opt  monet_prompt = >
+# builtin opt  monet_daemon = no
+# builtin opt  mapi_port = 50000
+# builtin opt  mapi_open = false
+# builtin opt  mapi_autosense = false
+# builtin opt  sql_optimizer = default_pipe
+# builtin opt  sql_debug = 0
+# cmdline opt  gdk_nr_threads = 0
+# cmdline opt  mapi_open = true
+# cmdline opt  mapi_port = 36085
+# cmdline opt  mapi_usock = /var/tmp/mtest-2361/.s.monetdb.36085
+# cmdline opt  monet_prompt = 
+# cmdline opt  gdk_dbpath = 
/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5
+# cmdline opt  gdk_debug = 536870922
+
+# 16:27:02 >  
+# 16:27:02 >  "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" 
"--host=/var/tmp/mtest-2361" "--port=36085"
+# 16:27:02 >  
+
+
+# 16:27:07 >  
+# 16:27:07 >  "Done."
+# 16:27:07 >  
+
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.out 
b/sql/backends/monet5/Tests/cfunction01.stable.out
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cfunction01.stable.out
@@ -0,0 +1,136 @@
+stdout of test 'cfunction01` in directory 'sql/backends/monet5` itself:
+
+
+# 16:27:02 >  
+# 16:27:02 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=36085" "--set" 
"mapi_usock=/var/tmp/mtest-2361/.s.monetdb.36085" "--set" "monet_prompt=" 
"--forcemito" 
"--dbpath=/home/ferreira/MonetDB-trails/BUILD/var/MonetDB/mTests_sql_backends_monet5"
+# 16:27:02 >  
+
+# MonetDB 5 server v11.28.0
+# This is an unreleased version
+# Serving database 'mTests_sql_backends_monet5', using 8 threads
+# Compiled for x86_64-pc-linux-gnu/64bit with 128bit integers
+# Found 15.498 GiB available main-memory.
+# Copyright (c) 1993-July 2008 CWI.
+# Copyright (c) August 2008-2017 MonetDB B.V., all rights reserved
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to