Changeset: adcf629c344a for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=adcf629c344a Modified Files: sql/backends/monet5/Tests/cqstream02.sql sql/backends/monet5/sql_cquery.c sql/backends/monet5/sql_execute.c Branch: trails Log Message:
Avoid interference with concurrent procedure runs diffs (136 lines): diff --git a/sql/backends/monet5/Tests/cqstream02.sql b/sql/backends/monet5/Tests/cqstream02.sql --- a/sql/backends/monet5/Tests/cqstream02.sql +++ b/sql/backends/monet5/Tests/cqstream02.sql @@ -11,13 +11,14 @@ begin set tmp_total = tmp_total + (select sum(val) from sys.stmp10), tmp_count = tmp_count + (select count(*) from sys.stmp10); end; -start continuous sys.cq_collector(); insert into stmp10 values('2005-09-23 12:34:26.000',1,9.0); insert into stmp10 values('2005-09-23 12:34:27.000',1,11.0); insert into stmp10 values('2005-09-23 12:34:28.000',1,13.0); insert into stmp10 values('2005-09-23 12:34:28.000',1,15.0); +start continuous sys.cq_collector(); + -- Run the query a few times call cquery.cycles(3); diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c --- a/sql/backends/monet5/sql_cquery.c +++ b/sql/backends/monet5/sql_cquery.c @@ -641,7 +641,6 @@ CQresumeInternalRanges(int first, int la fprintf(stderr, "#resume scheduler\n"); #endif for( ; first < last; first++) - if( pnet[first].status == CQPAUSE ) pnet[first].status = CQWAIT; /* start the scheduler if needed */ @@ -663,8 +662,10 @@ CQresumeInternal(str modnme, str fcnnme) msg = createException(SQL, "cquery.resume", "Continuous procedure %s.%s not accessible\n", modnme, fcnnme); goto finish; } + if( pnet[idx].status != CQPAUSE) + goto finish; msg = CQresumeInternalRanges(idx, idx+1); - finish: +finish: MT_lock_unset(&ttrLock); return msg; } @@ -720,6 +721,14 @@ CQpauseInternal(str modnme, str fcnnme) msg = createException(SQL, "cquery.pause", "Continuous procedure %s.%s not accessible\n", modnme, fcnnme); goto finish; } + // actually wait if the query was running + while( pnet[idx].status == CQRUNNING ){ + MT_lock_unset(&ttrLock); + MT_sleep_ms(10); + MT_lock_set(&ttrLock); + if( pnet[idx].status == CQWAIT) + break; + } msg = CQpauseInternalRanges(idx, idx+1); finish: MT_lock_unset(&ttrLock); @@ -883,8 +892,18 @@ CQderegisterInternal(str modnme, str fcn msg = createException(SQL, "cquery.deregister", "Continuous procedure %s.%s not accessible\n", modnme, fcnnme); goto finish; } - if(idx < pnettop) //remove from the petrinet only when found - msg = CQderegisterInternalRanges(idx, idx+1); + if(idx == pnettop) + goto finish; + + // actually wait if the query was running + while( pnet[idx].status == CQRUNNING ){ + MT_lock_unset(&ttrLock); + MT_sleep_ms(10); + MT_lock_set(&ttrLock); + if( pnet[idx].status == CQWAIT) + break; + } + msg = CQderegisterInternalRanges(idx, idx+1); finish: MT_lock_unset(&ttrLock); @@ -985,7 +1004,7 @@ CQexecute( Client cntxt, int idx) fprintf(stderr, "#cquery.execute %s.%s finised %s\n", node->mod, node->fcn, (msg?msg:"")); #endif MT_lock_set(&ttrLock); - if( node->status != CQPAUSE) + if( node->status != CQPAUSE && node->status != CQSTOP) node->status = CQWAIT; MT_lock_unset(&ttrLock); } @@ -1091,6 +1110,13 @@ CQscheduler(void *dummy) t = GDKusec(); // Fork MAL execution thread + MT_lock_set(&ttrLock); + if (pnet[i].status != CQWAIT){ + MT_lock_unset(&ttrLock); + goto wrapup; + } + pnet[i].status = CQRUNNING; + MT_lock_unset(&ttrLock); CQexecute(cntxt, i); /* if (MT_create_thread(&pnet[i].tid, CQexecute, (void*) (pnet+i), MT_THR_JOINABLE) < 0){ @@ -1114,6 +1140,7 @@ CQscheduler(void *dummy) } delay = cycleDelay; } +wrapup: /* after one sweep all threads should be released */ /* for (m = 0; m < k; m++) diff --git a/sql/backends/monet5/sql_execute.c b/sql/backends/monet5/sql_execute.c --- a/sql/backends/monet5/sql_execute.c +++ b/sql/backends/monet5/sql_execute.c @@ -360,20 +360,20 @@ SQLrun(Client c, backend *be, mvc *m){ break; case mod_stop_continuous: //mnstr_printf(c->fdout, "#Stop continuous query\n"); + CQderegister(c,mb, 0,0); m->continuous = 0; - CQderegister(c,mb, 0,0); msg = MAL_SUCCEED; break; case mod_pause_continuous: //mnstr_printf(c->fdout, "#Pause continuous query\n"); + CQpause(c,mb, 0,0); m->continuous = 0; - CQpause(c,mb, 0,0); msg = MAL_SUCCEED; break; case mod_resume_continuous: //mnstr_printf(c->fdout, "#Resume continuous query\n"); + CQresume(c,mb, 0,0); m->continuous = 0; - CQresume(c,mb, 0,0); msg = MAL_SUCCEED; break; default: _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list