Changeset: 6b0bc7248043 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6b0bc7248043
Modified Files:
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/petrinet.c
        sql/include/sql_catalog.h
        sql/server/rel_schema.c
Branch: iot
Log Message:

Use basic relational tables
make sure you lock the baskets before using.


diffs (200 lines):

diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -12,6 +12,8 @@ end;
 call iot.query('iot','cq00');
 call iot.query('insert into iot.result select min(t), count(*), avg(val) from 
iot.stream_tmp;');
 
+--insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34);
+
 select * from  iot.baskets();
 select * from  iot.queries();
 select * from  iot.inputplaces();
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -110,8 +110,14 @@ BSKTnewbasket(sql_schema *s, sql_table *
        baskets[idx].seen = * timestamp_nil;
 
        baskets[idx].count = 0;
-       for (o = t->columns.set->h; o; o = o->next)
+       for (o = t->columns.set->h; o; o = o->next){
+        sql_column *col = o->data;
+        int tpe = col->type.type->localtype;
+
+        if (tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || tpe 
== TYPE_timestamp) 
+                       throw(MAL,"baskets.register","Unsupported type");
                baskets[idx].count++;
+       }
        baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
        if (baskets[idx].table_name == NULL ||
            baskets[idx].errors == NULL) {
@@ -280,7 +286,15 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal
     int bskt;
        char buf[BUFSIZ];
        node *n;
+       mvc *m = NULL;
+       BAT *b;
+       int first=1;
+       BUN cnt =0;
+       str msg;
 
+       msg= getSQLContext(cntxt,NULL, &m, NULL);
+       if( msg != MAL_SUCCEED)
+               return msg;
     bskt = BSKTlocate(sch,tbl);
        if (bskt == 0)
                throw(SQL, "iot.push", "Could not find the basket 
%s.%s",sch,tbl);
@@ -290,12 +304,28 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal
                throw(SQL, "iot.push", "Could not access the basket directory 
%s. error %d",dir,errno);
        }
        
+       // types are already checked during stream initialization
+       MT_lock_set(&baskets[bskt].lock);
        for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
                sql_column *c = n->data;
                snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name);
                _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
+               BATattach(c->type.type->localtype,buf,PERSISTENT);
+               b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL);
+               if( b){ 
+                       baskets[bskt].count = BATcount(b);
+                       BBPunfix(b->batCacheid);
+                       if( first){
+                               cnt = BATcount(b);
+                               first = 0;
+                       } else
+                               if( cnt != BATcount(b)){
+                                       MT_lock_unset(&baskets[bskt].lock);
+                                       throw(MAL,"iot.push","Non-aligned 
binary input files");
+                               }
+               }
        }
-    (void) cntxt;
+       MT_lock_unset(&baskets[bskt].lock);
     (void) mb;
     return MAL_SUCCEED;
 }
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -292,14 +292,30 @@ PNanalysis(Client cntxt, MalBlkPtr mb, i
  * experiment with more advanced schemes, e.g., priority queues.
  *
  * During each step cycle we first enable the transformations.
+ *
+ * Locking the streams is necessary to avoid concurrent changes.
+ * Using a fixed order over the basket table, ensure no deadlock.
  */
 static void
 PNexecute( void *n)
 {
        PNnode *node= (PNnode *) n;
-       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed 
%s.%s\n",node->modname, node->fcnname);
+       int j, idx;
+       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute 
%s.%s\n",node->modname, node->fcnname);
+       // first grab exclusive access to all streams.
+       for (j = 0; j < MAXBSKT &&  node->enabled && node->places[j]; j++) {
+               idx = node->places[j];
+               MT_lock_set(&baskets[idx].lock);
+       }
+       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
locked\n",node->modname, node->fcnname);
        runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
        node->status = BSKTPAUSE;
+       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s 
transition done\n",node->modname, node->fcnname);
+       for (j = MAXBSKT; j > 0 &&  node->enabled && node->places[j]; j--) {
+               idx = node->places[j];
+               MT_lock_unset(&baskets[idx].lock);
+       }
+       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
unlocked\n",node->modname, node->fcnname);
 }
 
 static void
@@ -332,13 +348,13 @@ PNcontroller(void *dummy)
                /* collect latest statistics, note that we don't need a lock 
here,
                   because the count need not be accurate to the usec. It will 
simply
                   come back. We also only have to check the places that are 
marked
-                  empty. */
+                  non empty. */
                for(i=0; i< MAXBSKT; i++)
                        claimed[i]=0;
                now = GDKusec();
                for (k = i = 0; status == BSKTRUNNING && i < pnettop; i++) 
                if ( pnet[i].status != BSKTPAUSE ){
-                       // check if all baskets are available
+                       // check if all baskets are available and non-empty
                        pnet[i].enabled = 1;
                        for (j = 0; j < MAXBSKT &&  pnet[i].enabled && 
pnet[i].places[j]; j++) {
                                idx = pnet[i].places[j];
@@ -365,7 +381,9 @@ PNcontroller(void *dummy)
                                                _DEBUG_PETRINET_ 
mnstr_printf(cntxt->fdout, "#petrinet: %s.%s enabled twice,disgarded \n", 
pnet[i].modname, pnet[i].fcnname);
                                                pnet[i].enabled = 0;
                                                break;
-                                       }
+                                       } 
+
+                               /* rule out all others */
                                if( pnet[i].enabled)
                                        for (j = 0; j < MAXBSKT &&  
pnet[i].enabled && pnet[i].places[j]; j++) 
                                                claimed[pnet[i].places[j]]= 1;
@@ -377,10 +395,8 @@ PNcontroller(void *dummy)
                }
                analysis = GDKusec() - now;
 
-               /* execute each enabled transformation */
-               /* We don't need to access again all the factories and check 
again which are available to execute them
-                * we have already kept the enable ones in the enabled list 
(created in the previous loop)
-                * and now it is enough to access that list*/
+               /* Execute each enabled transformation */
+               /* Tricky part is here a single stream used by multiple 
transitions */
                for (m = 0; m < k; m++) {
                        i = enabled[m];
                        if (pnet[i].enabled ) {
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -461,7 +461,7 @@ typedef enum table_types {
        tt_replica_table = 6    /* multiple replica of the same table */
 } table_types;
 
-#define isTable(x)       (x->type==tt_table)
+#define isTable(x)       (x->type==tt_table || x->type == tt_stream)
 #define isView(x)        (x->type==tt_view)
 #define isMergeTable(x)   (x->type==tt_merge_table)
 #define isStream(x)      (x->type==tt_stream)
diff --git a/sql/server/rel_schema.c b/sql/server/rel_schema.c
--- a/sql/server/rel_schema.c
+++ b/sql/server/rel_schema.c
@@ -877,12 +877,12 @@ rel_create_table(mvc *sql, sql_schema *s
        if (sname && !(s = mvc_bind_schema(sql, sname)))
                return sql_error(sql, 02, "3F000!CREATE TABLE: no such schema 
'%s'", sname);
 
-       if (temp != SQL_PERSIST && tt == tt_table && 
+       if (temp != SQL_PERSIST && (tt == tt_table || tt == tt_stream) && 
                        commit_action == CA_COMMIT)
                commit_action = CA_DELETE;
        
        if (temp != SQL_DECLARED_TABLE) {
-               if (temp != SQL_PERSIST && tt == tt_table) {
+               if (temp != SQL_PERSIST && (tt == tt_table || tt == tt_stream)) 
{
                        s = mvc_bind_schema(sql, "tmp");
                        if (temp == SQL_LOCAL_TEMP && sname && strcmp(sname, 
s->base.name) != 0)
                                return sql_error(sql, 02, "3F000!CREATE TABLE: 
local tempory tables should be stored in the '%s' schema", s->base.name);
@@ -922,7 +922,7 @@ rel_create_table(mvc *sql, sql_schema *s
                        if (res == SQL_ERR) 
                                return NULL;
                }
-               temp = (tt == tt_table)?temp:SQL_PERSIST;
+               temp = (tt == tt_table || tt == tt_stream)?temp:SQL_PERSIST;
                return rel_table(sql, DDL_CREATE_TABLE, sname, t, temp);
        } else { /* [col name list] as subquery with or without data */
                sql_rel *sq = NULL, *res = NULL;
@@ -944,7 +944,7 @@ rel_create_table(mvc *sql, sql_schema *s
                }
 
                /* insert query result into this table */
-               temp = (tt == tt_table)?temp:SQL_PERSIST;
+               temp = (tt == tt_table || tt == tt_stream)?temp:SQL_PERSIST;
                res = rel_table(sql, DDL_CREATE_TABLE, sname, t, temp);
                if (with_data) {
                        res = rel_insert(sql, res, sq);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to