Changeset: 931c38cd87bc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=931c38cd87bc
Modified Files:
        sql/backends/monet5/iot/Tests/bug05.sql
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/Tests/iot02.sql
        sql/backends/monet5/iot/Tests/iot06.stable.out
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
Branch: iot
Log Message:

Fixing bounds§


diffs (truncated from 621 to 300 lines):

diff --git a/sql/backends/monet5/iot/Tests/bug05.sql 
b/sql/backends/monet5/iot/Tests/bug05.sql
--- a/sql/backends/monet5/iot/Tests/bug05.sql
+++ b/sql/backends/monet5/iot/Tests/bug05.sql
@@ -17,7 +17,7 @@ INSERT INTO testing VALUES (now(), 2, 2)
 INSERT INTO testing VALUES (now(), 3, 3);
 
 CALL iot.show('sys', 'cquery');
-CALL iot.stop();
+CALL iot.pause();
 
 DROP PROCEDURE cquery;
 DROP TABLE testout;
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -35,7 +35,7 @@ select * from result;
 --select * from  iot.baskets();
 --select * from  iot.queries();
 select * from iot.errors();
-call iot.stop();
+call iot.pause();
 drop procedure cq00;
 drop table stmp;
 drop table result;
diff --git a/sql/backends/monet5/iot/Tests/iot02.sql 
b/sql/backends/monet5/iot/Tests/iot02.sql
--- a/sql/backends/monet5/iot/Tests/iot02.sql
+++ b/sql/backends/monet5/iot/Tests/iot02.sql
@@ -24,14 +24,15 @@ insert into stmp2 values('2005-09-23 12:
 call iot.resume('iot','cq02');
 
 -- wait for 5 seconds for handler
-call iot.wait(5000);
 
 select 'RESULT';
+--call iot.cycles('iot','cq02',4);
+call iot.wait(2000);
 select * from stmp2;
 select * from result1;
 select * from result2;
 
-call iot.stop();
+call iot.pause();
 select * from iot.errors();
 drop procedure cq02;
 drop table stmp2;
diff --git a/sql/backends/monet5/iot/Tests/iot06.stable.out 
b/sql/backends/monet5/iot/Tests/iot06.stable.out
--- a/sql/backends/monet5/iot/Tests/iot06.stable.out
+++ b/sql/backends/monet5/iot/Tests/iot06.stable.out
@@ -39,24 +39,24 @@ Ready.
 % clob # type
 % 63 # length
 unsafe function iot.cq06():void;
-    X_1 := sql.mvc();
-    X_33 := basket.register(X_1,"iot","tmp06",0);
-barrier X_56 := language.dataflow();
-    C_2:bat[:oid] := basket.tid(X_1,"iot","tmp06");
-    X_5:bat[:timestamp] := basket.bind(X_33,"iot","tmp06","t");
-    X_8 := aggr.min(X_5);
-exit X_56;
-    X_9 := sql.append(X_33,"iot","result","t",X_8);
-    X_11 := aggr.count(X_5);
-    X_12 := calc.int(X_11);
-    X_13 := sql.append(X_9,"iot","result","sensor",X_12);
-    X_15:bat[:int] := basket.bind(X_13,"iot","tmp06","val");
-    X_17:bat[:dbl] := batcalc.dbl(2,X_15);
-    X_19:dbl := aggr.avg(X_17);
-    X_20 := calc.int(X_19,8,2);
-    X_22 := sql.append(X_13,"iot","result","val",X_20);
-    X_34 := basket.tumble(X_22,"iot","tmp06");
-    basket.commit(X_34,"iot","tmp06");
+    X_0 := sql.mvc();
+    X_32 := basket.register(X_0,"iot","tmp06",0);
+barrier X_60 := language.dataflow();
+    C_1:bat[:oid] := basket.tid(X_0,"iot","tmp06");
+    X_4:bat[:timestamp] := basket.bind(X_32,"iot","tmp06","t");
+    X_8 := aggr.min(X_4);
+exit X_60;
+    X_10 := sql.append(X_32,"iot","result","t",X_8);
+    X_12 := aggr.count(X_4);
+    X_13 := calc.int(X_12);
+    X_15 := sql.append(X_10,"iot","result","sensor",X_13);
+    X_17:bat[:int] := basket.bind(X_15,"iot","tmp06","val");
+    X_20:bat[:dbl] := batcalc.dbl(2,X_17);
+    X_24:dbl := aggr.avg(X_20);
+    X_25 := calc.int(X_24,8,2);
+    X_28 := sql.append(X_15,"iot","result","val",X_25);
+    X_36 := basket.tumble(X_28,"iot","tmp06");
+    basket.commit(X_36,"iot","tmp06");
 catch SQLexception:str;
     iot.error("user","cq06",SQLexception);
 exit SQLexception:str;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -352,6 +352,8 @@ BSKTtid(Client cntxt, MalBlkPtr mb, MalS
        if( bskt == 0)  
                throw(SQL,"basket.bind","Stream table column '%s.%s' not 
found\n",sch,tbl);
        b = baskets[bskt].bats[0];
+       if( b == 0)
+               throw(SQL,"basket.bind","Stream table reference column '%s.%s' 
not accessible\n",sch,tbl);
 
     tids = COLnew(0, TYPE_void, 0, TRANSIENT);
     if (tids == NULL)
@@ -464,6 +466,7 @@ BSKTimportInternal(Client cntxt, int bsk
                assert( b);
                bcnt = BATcount(b);
 
+               if( fsize > 0)
                switch(ATOMstorage(b->ttype)){
                case TYPE_bit:
                case TYPE_bte:
@@ -657,9 +660,9 @@ BSKTexport(Client cntxt, MalBlkPtr mb, M
 }
 
 /* remove tuples from a basket according to the sliding policy */
-#define ColumnShift(B,TPE, STRIDE) { \
+#define ColumnShift(B,TPE, CNT) { \
        TPE *first= (TPE*) Tloc(B, 0);\
-       TPE *n = first+STRIDE;\
+       TPE *n = first+CNT;\
        TPE *last=  (TPE*) Tloc(B, BUNlast(B));\
        for( ; n < last; n++, first++)\
                *first=*n;\
@@ -710,7 +713,9 @@ BSKTtumbleInternal(Client cntxt, str sch
                }
                if( stride == -1)
                        BATsetcount(b, 0);
-               else BATsetcount(b, BATcount(b)-cnt);
+               else 
+               if( BATcount(b) >= cnt)
+                       BATsetcount(b, BATcount(b)-cnt);
                if( BATcount(b) == 0){
                        baskets[bskt].status = BSKTWAIT;
                }
@@ -974,8 +979,10 @@ BSKTreset(Client cntxt, MalBlkPtr mb, Ma
        MT_lock_set(&baskets[idx].lock);
        for( i=0; baskets[idx].cols[i]; i++){
                b = baskets[idx].bats[i];
-               if(b)
+               if(b){
                        BATsetcount(b,0);
+                       BATsettrivprop(b);
+               }
        }
        baskets[idx].status = BSKTWAIT;
        MT_lock_unset(&baskets[idx].lock);
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -63,7 +63,6 @@ typedef struct {
        str fcnname;
        MalBlkPtr mb;       /* Query block */
        MalStkPtr stk;          /* might be handy */
-       Client client;          /* MAL client context for this query */
 
        int status;     /* query status waiting/running/paused */
        int enabled;    /* all baskets are available */
@@ -192,7 +191,9 @@ PNregisterInternal(Client cntxt, MalBlkP
        Symbol s;
        char buf[IDLENGTH];
 
-       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#registerInternal status %d\n", 
init);
+#ifdef DEBUG_PETRINET
+       mnstr_printf(GDKout, "#registerInternal status %d\n", init);
+#endif
        if (pnettop == MAXPN) 
                GDKerror("petrinet.register:Too many transitions");
 
@@ -218,20 +219,13 @@ PNregisterInternal(Client cntxt, MalBlkP
        setArgType(nmb,q, 0, TYPE_void);
        pushEndInstruction(nmb);
        chkProgram(cntxt->fdout, cntxt->nspace, nmb);
-       _DEBUG_PETRINET_ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL);
+#ifdef DEBUG_PETRINET
+       printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL);
+#endif
 
        pnet[pnettop].mb = nmb;
        pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
 
-       if(pnet[pnettop].client == NULL) {
-               pnet[pnettop].client = MCinitClient(0,0,0);
-               if (pnet[pnettop].client == NULL)
-                       throw(MAL,"petrinet.register","Failed to create client 
record for continous query\n");
-               msg = SQLinitClient(pnet[pnettop].client);
-               if(msg)
-                       return msg;
-       }
-
        pnet[pnettop].status = PNWAIT;
        pnet[pnettop].limit = calls; 
        pnet[pnettop].seen = *timestamp_nil;
@@ -266,13 +260,17 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
                        throw(SQL,"iot.pause","Continuous query %s.%s not 
found\n", modname, fcnname);
                }
                pnet[i].status = newstatus;
-               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s 
%s\n", modname, fcnname, statusname[newstatus]);
+#ifdef DEBUG_PETRINET
+       mnstr_printf(GDKout, "#scheduler status %s.%s %s\n", modname, fcnname, 
statusname[newstatus]);
+#endif
                MT_lock_unset(&iotLock);
                return MAL_SUCCEED;
        }
        for ( i = 0; i < pnettop; i++){
                pnet[i].status = newstatus;
-               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s: 
%s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
+#ifdef DEBUG_PETRINET
+               mnstr_printf(GDKout, "#scheduler status %s.%s: %s\n", 
pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
+#endif
        }
        MT_lock_unset(&iotLock);
        return MAL_SUCCEED;
@@ -280,28 +278,39 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
 
 str
 PNresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#resume scheduler\n");
+#ifdef DEBUG_PETRINET
+       mnstr_printf(GDKout, "#resume scheduler\n");
+#endif
        return PNstatus(cntxt, mb, stk, pci, PNWAIT);
 }
 
 str
 PNpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#pause scheduler\n");
+#ifdef DEBUG_PETRINET
+       mnstr_printf(GDKout, "#pause scheduler\n");
+#endif
        return PNstatus(cntxt, mb, stk, pci, PNPAUSED);
 }
 
 str
 PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
+#ifdef DEBUG_PETRINET
        int old = PNcycle;
+#endif
        int delay= *getArgReference_int(stk,pci,1);
        lng clk = GDKms();
 
+       (void) cntxt;
        (void) mb;
-       _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#scheduler wait %d 
ms\n",delay);
+#ifdef DEBUG_PETRINET
+       mnstr_printf(cntxt->fdout, "#scheduler wait %d ms\n",delay);
+#endif
        delay = delay < PNDELAY? 2*PNDELAY:delay;
        while( GDKms() < clk + delay )
                MT_sleep_ms(PNDELAY);
-       _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#wait finished after %d 
cycles\n",PNcycle -old );
+#ifdef DEBUG_PETRINET
+       mnstr_printf(cntxt->fdout, "#wait finished after %d cycles\n",PNcycle 
-old );
+#endif
        return MAL_SUCCEED;
 }
 
@@ -312,7 +321,6 @@ PNderegisterInternal(int i){
        MT_lock_set(&iotLock);
        GDKfree(pnet[i].modname);
        GDKfree(pnet[i].fcnname);
-       //MCcloseClient(pnet[i].client);
        memset((void*) (pnet+i), 0, sizeof(PNnode));
        for( ; i<pnettop-1; i++)
                pnet[i] = pnet[i+1];
@@ -338,25 +346,30 @@ PNderegister(Client cntxt, MalBlkPtr mb,
                        throw(SQL,"iot.pause","Continuous query %s.%s not 
found\n", modname, fcnname);
                }
                PNderegisterInternal(i);
-               _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered 
%s.%s\n", modname, fcnname);
+#ifdef DEBUG_PETRINET
+               mnstr_printf(GDKout, "#scheduler deregistered %s.%s\n", 
modname, fcnname);
+#endif
                return MAL_SUCCEED;
        }
        for ( i = pnettop-1; i >= 0 ; i--)
                PNderegisterInternal(i);
-       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered all\n");
+#ifdef DEBUG_PETRINET
+       mnstr_printf(GDKout, "#scheduler deregistered all\n");
+#endif
        return MAL_SUCCEED;
 }
 
 /* safely stop the engine by stopping all CQ firt */
 str
 PNstop(void){
-       int i, cnt,limit = 20;
-       _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler being stopped\n");
+       int i, cnt,limit = 200;
+#ifdef DEBUG_PETRINET
+       mnstr_printf(GDKout, "#scheduler being stopped\n");
+#endif
 
+       MT_lock_set(&iotLock);
        pnstatus = PNSTOP; // avoid starting new continuous queries
-       for( i = 0; i < pnettop; i++)
-       if( pnet[i].client )
-               pnet[i].client->itrace ='x';
+       MT_lock_unset(&iotLock);
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to