Changeset: ae4e6e2095ec for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae4e6e2095ec
Modified Files:
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:

Add a private MAL client per continous query
always make the errors table accessiblg


diffs (246 lines):

diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -94,9 +94,9 @@ create function iot.outputs()
  returns table( "s" string, "t" string, "sch" string, "qry" string)
  external name iot.outputplaces;
 
--- create function iot.errors()
--- returns table( "schema" string,  "table" string, error string)
--- external name iot.errors;
+create function iot.errors()
+returns table( "table" string, error string)
+external name iot.errors;
 
 -- tables for iotwebserver
 
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -74,8 +74,8 @@ BSKTclean(int idx)
                baskets[idx].table_name = NULL;
 
                BBPreclaim(baskets[idx].errors);
+               baskets[idx].errors = NULL;
                baskets[idx].winstride = -1;
-               baskets[idx].errors = NULL;
                baskets[idx].count = 0;
        }
        for(idx = 1; idx < bsktTop; idx++){
@@ -85,8 +85,8 @@ BSKTclean(int idx)
                baskets[idx].table_name = NULL;
 
                BBPreclaim(baskets[idx].errors);
+               baskets[idx].errors = NULL;
                baskets[idx].winstride = -1;
-               baskets[idx].errors = NULL;
                baskets[idx].count = 0;
        }
 }
@@ -607,15 +607,11 @@ BSKTdump(void *ret)
        int bskt;
        BUN cnt;
        BAT *b;
-       mvc *m = NULL;
        str msg = MAL_SUCCEED;
 
        mnstr_printf(GDKout, "#baskets table\n");
        for (bskt = 1; bskt < bsktLimit; bskt++)
                if (baskets[bskt].table_name) {
-                       msg = getSQLContext(mal_clients, 0, &m, NULL);
-                       if ( msg != MAL_SUCCEED)
-                               break;
                        cnt = 0;
                        b = baskets[bskt].bats[0];
                        if( b)
@@ -684,6 +680,48 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
 }
 
 str
+BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    int *res = getArgReference_int(stk, pci, 0);
+    str sname = *getArgReference_str(stk, pci, 2);
+    str tname = *getArgReference_str(stk, pci, 3);
+    str cname = *getArgReference_str(stk, pci, 4);
+    bat rows = *getArgReference_bat(stk, pci, 5);
+    bat val = *getArgReference_bat(stk, pci, 6);
+    BAT *bn=0, *rid=0, *bval = 0;
+       int bskt;
+
+       return 0;
+       (void) cntxt;
+       (void) mb;
+    *res = 0;
+
+    rid = BATdescriptor(rows);
+       if( rid == NULL)
+        throw(SQL, "basket.append", "Cannot access source oid descriptor");
+    bval = BATdescriptor(val);
+       if( bval == NULL){
+               BBPunfix(rid->batCacheid);
+        throw(SQL, "basket.append", "Cannot access source descriptor");
+       }
+
+       bskt = BSKTlocate(sname,tname);
+       if( bskt == 0)
+               throw(SQL, "basket.append", "Cannot access basket descriptor 
%s.%s",sname,tname);
+       bn = BSKTbindColumn(sname,tname,cname);
+
+       if( bn){
+               void_replace_bat(bn, rid, bval, TRUE);
+               BATderiveProps(bn, FALSE);
+       } else throw(SQL, "basket.append", "Cannot access target column 
%s.%s.%s",sname,tname,cname);
+       
+       baskets[bskt].status = BSKTFILLED;
+       BBPunfix(rid->batCacheid);
+       BBPunfix(bval->batCacheid);
+       return MAL_SUCCEED;
+}
+
+str
 BSKTreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
     lng *res = getArgReference_lng(stk, pci, 0);
@@ -894,7 +932,7 @@ BSKTtableerrors(bat *nameId, bat *errorI
        }
 
        for (i = 1; i < bsktTop; i++)
-               if (BATcount(baskets[i].errors) > 0) {
+               if (baskets[i].errors && BATcount(baskets[i].errors) > 0) {
                        bi = bat_iterator(baskets[i].errors);
                        BATloop(baskets[i].errors, p, q)
                        {
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -89,6 +89,7 @@ iot_export int BSKTlocate(str sch, str t
 iot_export int BSKTunlocate(str sch, str tbl);
 iot_export int BSKTlocate(str sch, str tbl);
 iot_export str BSKTappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTimportInternal(Client cntxt, int bskt);
 iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -171,12 +171,20 @@ static void
 IOTreceptorThread(void *dummy)
 {
        int idx = *(int*)dummy;
+       Client cntxt = MCinitClient(0, mal_clients[0].fdin, 
mal_clients[0].fdout);
+
+       if( cntxt == NULL)
+               return;
+       //SQLinitClient(cntxt);
     _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s started for %s\n",
                baskets[idx].schema_name, 
                baskets[idx].table_name, 
                baskets[idx].source);
        /* continously scan the container for baskets */
-               BSKTimportInternal(mal_clients, idx);
+               BSKTimportInternal(cntxt, idx);
+    _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s imported the  
file\n",
+               baskets[idx].schema_name, 
+               baskets[idx].table_name);
 }
 
 str
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -62,6 +62,7 @@ typedef struct {
        str fcnname;
        MalBlkPtr mb;       /* Query block */
        MalStkPtr stk;          /* might be handy */
+       Client client;          /* MAL client context for this query */
 
        int status;     /* query status waiting/running/paused */
        int enabled;    /* all baskets are available */
@@ -164,6 +165,13 @@ PNregisterInternal(Client cntxt, MalBlkP
        pnet[pnettop].mb = nmb;
        pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
 
+       pnet[pnettop].client = MCinitClient(0,0,0);
+       if ( pnet[pnettop].client == NULL)
+               throw(MAL,"petrinet.register","Failed to create client record 
for continous query");
+       msg = SQLinitClient(pnet[pnettop].client);
+       if( msg)
+               return msg;
+       
        pnet[pnettop].status = PNWAIT;
        pnet[pnettop].cycles = 0;
        pnet[pnettop].seen = *timestamp_nil;
@@ -238,12 +246,15 @@ PNstop(void){
        int i,cnt;
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler being stopped\n");
 
-       pnstatus = PNSTOP;
+       pnstatus = PNSTOP; // avoid starting new continuous queries
+       for(cnt=0,  i = 0; i < pnettop; i++)
+       if( pnet[i].client )
+               pnet[i].client->itrace ='x';
+
        do{
                MT_sleep_ms(20);
-               for(cnt=0,  i = 0; i < pnettop; i++){
-                       cnt += pnet[i].status == PNRUNNING;
-               }
+               for(cnt=0,  i = 0; i < pnettop; i++)
+                       cnt += pnet[i].status != PNWAIT;
        } while(cnt);
        BSKTclean(0);
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#all queries stopped \n");
@@ -271,6 +282,7 @@ PNderegister(Client cntxt, MalBlkPtr mb,
                }
                GDKfree(pnet[i].modname);
                GDKfree(pnet[i].fcnname);
+               MCcloseClient(pnet[i].client);
                for( ; i <pnettop-1;i++)
                        pnet[i]= pnet[i+1];
                memset((void*) (pnet+i), 0, sizeof(PNnode));
@@ -282,6 +294,7 @@ PNderegister(Client cntxt, MalBlkPtr mb,
        for ( i = 0; i < pnettop; i++){
                GDKfree(pnet[i].modname);
                GDKfree(pnet[i].fcnname);
+               MCcloseClient(pnet[i].client);
                memset((void*) (pnet+i), 0, sizeof(PNnode));
        }
        pnettop = 0;
@@ -395,7 +408,7 @@ PNexecute( void *n)
 
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
locked\n",node->modname, node->fcnname);
 
-       msg = runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
+       msg = runMALsequence(node->client, node->mb, 1, 0, node->stk, 0, 0);
 
        _DEBUG_PETRINET_ 
                mnstr_printf(PNout, "#petrinet.execute %s.%s transition 
done:%s\n",
@@ -425,8 +438,16 @@ PNscheduler(void *dummy)
        timestamp ts, tn;
 
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n");
-       cntxt = mal_clients; /* run as admin in SQL mode*/
-        if( strcmp(cntxt->scenario, "sql") )
+       cntxt = MCinitClient(0,0,0); /* run as admin in SQL mode*/
+       if( cntxt){
+               if( SQLinitClient(cntxt) != MAL_SUCCEED)
+                       GDKerror("Could not initialize PNscheduler");
+       }else{
+               GDKerror("Could not initialize PNscheduler");
+               return;
+       }
+               
+        if( cntxt->scenario == NULL || strcmp(cntxt->scenario, "sql") )
                 SQLinitEnvironment(cntxt, NULL, NULL, NULL);
 
        pnstatus = PNRUNNING; // global state 
@@ -545,6 +566,7 @@ PNscheduler(void *dummy)
                }
 
        }
+       MCcloseClient(cntxt);
        pnstatus = PNINIT;
        _DEBUG_PETRINET_ mnstr_flush(PNout);
        (void) dummy;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to