Changeset: adcf629c344a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=adcf629c344a
Modified Files:
        sql/backends/monet5/Tests/cqstream02.sql
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_execute.c
Branch: trails
Log Message:

Avoid interference with concurrent procedure runs


diffs (136 lines):

diff --git a/sql/backends/monet5/Tests/cqstream02.sql 
b/sql/backends/monet5/Tests/cqstream02.sql
--- a/sql/backends/monet5/Tests/cqstream02.sql
+++ b/sql/backends/monet5/Tests/cqstream02.sql
@@ -11,13 +11,14 @@ begin
         set tmp_total = tmp_total + (select sum(val) from sys.stmp10),
             tmp_count = tmp_count + (select count(*) from sys.stmp10);
 end;
-start continuous sys.cq_collector();
 
 insert into stmp10 values('2005-09-23 12:34:26.000',1,9.0);
 insert into stmp10 values('2005-09-23 12:34:27.000',1,11.0);
 insert into stmp10 values('2005-09-23 12:34:28.000',1,13.0);
 insert into stmp10 values('2005-09-23 12:34:28.000',1,15.0);
 
+start continuous sys.cq_collector();
+
 -- Run the query a few times
 call cquery.cycles(3);
 
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -641,7 +641,6 @@ CQresumeInternalRanges(int first, int la
        fprintf(stderr, "#resume scheduler\n");
 #endif
        for( ; first < last; first++)
-       if( pnet[first].status == CQPAUSE )
                pnet[first].status = CQWAIT;
 
        /* start the scheduler if needed */
@@ -663,8 +662,10 @@ CQresumeInternal(str modnme, str fcnnme)
                msg = createException(SQL, "cquery.resume", "Continuous 
procedure %s.%s not accessible\n", modnme, fcnnme);
                goto finish;
        }
+       if( pnet[idx].status != CQPAUSE)
+               goto finish;
        msg = CQresumeInternalRanges(idx, idx+1);
-       finish:
+finish:
        MT_lock_unset(&ttrLock);
        return msg;
 }
@@ -720,6 +721,14 @@ CQpauseInternal(str modnme, str fcnnme)
                msg = createException(SQL, "cquery.pause", "Continuous 
procedure %s.%s not accessible\n", modnme, fcnnme);
                goto finish;
        }
+       // actually wait if the query was running
+       while( pnet[idx].status == CQRUNNING ){
+               MT_lock_unset(&ttrLock);
+               MT_sleep_ms(10);  
+               MT_lock_set(&ttrLock);
+               if( pnet[idx].status == CQWAIT)
+                       break;
+       }
        msg = CQpauseInternalRanges(idx, idx+1);
 finish:
        MT_lock_unset(&ttrLock);
@@ -883,8 +892,18 @@ CQderegisterInternal(str modnme, str fcn
                msg = createException(SQL, "cquery.deregister", "Continuous 
procedure %s.%s not accessible\n", modnme, fcnnme);
                goto finish;
        }
-       if(idx < pnettop) //remove from the petrinet only when found
-               msg = CQderegisterInternalRanges(idx, idx+1);
+       if(idx == pnettop) 
+               goto finish;
+
+       // actually wait if the query was running
+       while( pnet[idx].status == CQRUNNING ){
+               MT_lock_unset(&ttrLock);
+               MT_sleep_ms(10);  
+               MT_lock_set(&ttrLock);
+               if( pnet[idx].status == CQWAIT)
+                       break;
+       }
+       msg = CQderegisterInternalRanges(idx, idx+1);
 
 finish:
        MT_lock_unset(&ttrLock);
@@ -985,7 +1004,7 @@ CQexecute( Client cntxt, int idx)
                fprintf(stderr, "#cquery.execute %s.%s finised %s\n", 
node->mod, node->fcn, (msg?msg:""));
 #endif
        MT_lock_set(&ttrLock);
-       if( node->status != CQPAUSE)
+       if( node->status != CQPAUSE && node->status != CQSTOP)
                node->status = CQWAIT;
        MT_lock_unset(&ttrLock);
 }
@@ -1091,6 +1110,13 @@ CQscheduler(void *dummy)
 
                        t = GDKusec();
                        // Fork MAL execution thread 
+                       MT_lock_set(&ttrLock); 
+                       if (pnet[i].status != CQWAIT){
+                               MT_lock_unset(&ttrLock); 
+                               goto wrapup;
+                       }
+                       pnet[i].status = CQRUNNING;
+                       MT_lock_unset(&ttrLock); 
                        CQexecute(cntxt, i);
 /*
                                if (MT_create_thread(&pnet[i].tid, CQexecute, 
(void*) (pnet+i), MT_THR_JOINABLE) < 0){
@@ -1114,6 +1140,7 @@ CQscheduler(void *dummy)
                        }
                        delay = cycleDelay;
                }
+wrapup:
                /* after one sweep all threads should be released */
 /*
                for (m = 0; m < k; m++)
diff --git a/sql/backends/monet5/sql_execute.c 
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -360,20 +360,20 @@ SQLrun(Client c, backend *be, mvc *m){
                                break;
                        case mod_stop_continuous:
                                //mnstr_printf(c->fdout, "#Stop continuous 
query\n");
+                               CQderegister(c,mb, 0,0);
                                m->continuous = 0;
-                               CQderegister(c,mb, 0,0);
                                msg = MAL_SUCCEED;
                                break;
                        case mod_pause_continuous:
                                //mnstr_printf(c->fdout, "#Pause continuous 
query\n");
+                               CQpause(c,mb, 0,0);
                                m->continuous = 0;
-                               CQpause(c,mb, 0,0);
                                msg = MAL_SUCCEED;
                                break;
                        case mod_resume_continuous:
                                //mnstr_printf(c->fdout, "#Resume continuous 
query\n");
+                               CQresume(c,mb, 0,0);
                                m->continuous = 0;
-                               CQresume(c,mb, 0,0);
                                msg = MAL_SUCCEED;
                                break;
                        default:
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to