Changeset: 849d25275eca for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=849d25275eca
Modified Files:
        MonetDB5/src/mal/mal_interpreter.mx
        MonetDB5/src/mal/mal_recycle.mx
        MonetDB5/src/mal/mal_stack.mx
        MonetDB5/src/optimizer/opt_octopus.mx
        MonetDB5/src/scheduler/run_octopus.mx
Branch: default
Log Message:

1) Fixed local execution with octopus;
2) Refined scheduling of parallel blocks using stack hooks. In particular,
serialize calls to the same peer inside of a parallel block since the
same TCP/IP connection is used.


diffs (truncated from 342 to 300 lines):

diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/mal/mal_interpreter.mx
--- a/MonetDB5/src/mal/mal_interpreter.mx       Mon Jul 05 16:27:13 2010 +0200
+++ b/MonetDB5/src/mal/mal_interpreter.mx       Mon Jul 05 17:25:47 2010 +0200
@@ -1349,10 +1349,10 @@
                for(i = limit-1; i >= pc ; i--)
                if (fs[i].status == DFLOWpending ) {
                        p = getInstrPtr(flow->mb, fs[i].pc);
+                       if(flow->stk->admit == 0 || 
(*flow->stk->admit)(flow->cntxt, flow->mb, flow->stk, p) )
+
                        for ( j= p->retc; j < p->argc; j++)
-                       if ( getArg(p,j)== oa && DFLOWeligible(flow,fs,i,p,pc) 
&&
-                               (f == 0 || f->flow->stk->admit == 0 || 
(*f->flow->stk->admit)(f->flow->cntxt, f->flow->mb, f->flow->stk, 
getInstrPtr(flow->mb, f->pc)) )
-                       ){
+                       if ( getArg(p,j)== oa && DFLOWeligible(flow,fs,i,p,pc) 
){
                                queued++;
                                candidates ++;
                                DFLOWactivate(flow,fs,i,p);
diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/mal/mal_recycle.mx
--- a/MonetDB5/src/mal/mal_recycle.mx   Mon Jul 05 16:27:13 2010 +0200
+++ b/MonetDB5/src/mal/mal_recycle.mx   Mon Jul 05 17:25:47 2010 +0200
@@ -1037,7 +1037,7 @@
        lng cacheLimit;
        QryStatPtr qsp = recycleQPat->ptrn[cntxt->rcc->curQ];
 
-       (void) rd;      
+/*     (void) rd;      */
        RECYCLEspace();
        cacheLimit = recycleCacheLimit?recycleCacheLimit:HARDLIMIT_STMT;
        if ( recycleSize >= cacheLimit)
@@ -1094,10 +1094,11 @@
                /* use the field to refer to the query-owner index in the query 
pattern table */
        pushInstruction(recycleBlk,q);
        i = recycleBlk->stop-1;
-       recycleBlk->profiler[i].rbytes = recycleBlk->profiler[i].clk = 
GDKusec();
+/*     recycleBlk->profiler[i].rbytes = recycleBlk->profiler[i].clk = 
GDKusec(); */
+       recycleBlk->profiler[i].clk = GDKusec();
        recycleBlk->profiler[i].counter =1;
        recycleBlk->profiler[i].ticks = ticks;
-/*     recycleBlk->profiler[i].rbytes = rd; */
+       recycleBlk->profiler[i].rbytes = rd; 
        recycleBlk->profiler[i].wbytes = wr;
        recyclerUsedMemory += wr;
        if (monitorRecycler == 1 ) 
diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/mal/mal_stack.mx
--- a/MonetDB5/src/mal/mal_stack.mx     Mon Jul 05 16:27:13 2010 +0200
+++ b/MonetDB5/src/mal/mal_stack.mx     Mon Jul 05 17:25:47 2010 +0200
@@ -69,6 +69,7 @@
 #define MAXSHARES   8
 
 typedef str (*MALfcn) ();
+typedef int (*DFhook) (void *, void *, void *, void *);
 
 typedef struct MALSTK {
        int stksize;
@@ -83,8 +84,8 @@
 there may be different schemes to take instructions into execution.
 The admission scheme (and wrapup) are the necessary scheduler hooks.
 @h
-       MALfcn admit;
-       MALfcn wrapup;
+       DFhook admit;
+       DFhook wrapup;
        MT_Lock stklock;        /* used for parallel processing */
 @-
 It is handy to administer the timing in the stack frame
diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/optimizer/opt_octopus.mx
--- a/MonetDB5/src/optimizer/opt_octopus.mx     Mon Jul 05 16:27:13 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_octopus.mx     Mon Jul 05 17:25:47 2010 +0200
@@ -237,8 +237,9 @@
        pushArgument(mb, p, varid);
        p->barrier = RETURNsymbol;
        pushEndInstruction(mb);
-       newStmt(mb, optimizerRef, putName("aliases", 7));
-       newStmt(mb, optimizerRef, putName("deadcode", 8));
+       p = newStmt(mb, optimizerRef, putName("aliases", 7));
+       p = pushStr(mb, p, octopusRef);
+       p = pushStr(mb, p, getFunctionId(getInstrPtr(mb, 0)));
 }
 
 static MalBlkPtr 
diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/scheduler/run_octopus.mx
--- a/MonetDB5/src/scheduler/run_octopus.mx     Mon Jul 05 16:27:13 2010 +0200
+++ b/MonetDB5/src/scheduler/run_octopus.mx     Mon Jul 05 17:25:47 2010 +0200
@@ -88,7 +88,7 @@
 #include "mal_instruction.h"
 #include "mal_client.h"
 
-#define DEBUG_RUN_OCTOPUS              /* to trace processing */
+/*#define DEBUG_RUN_OCTOPUS            to trace processing */
 
 #ifdef WIN32
 #ifndef LIBRUN_OCTOPUS
@@ -133,6 +133,7 @@
        Registry nxt; /* list of registered mal functions */
        bte active;
        str conn;
+       int inuse;
 } Peer;
 
 typedef struct {
@@ -150,6 +151,7 @@
 static Worker workers[MAXSITES];  /* registry of workers for the current query 
*/
 static int nrworkers = 0;
 static int nrpeers=0;
+static bte octopusLocal=0;
 bte optTarget = 1; 
 /*sht bidStrategy = 1; 
 #define BID_TRANS      1
@@ -185,7 +187,8 @@
        peers[i].uri = GDKstrdup(uri);
        peers[i].pwd = GDKstrdup("monetdb");
        peers[i].active = 1;
-       peers[i].nxt = NULL;            
+       peers[i].nxt = NULL;
+       peers[i].inuse = 0;             
        nrpeers++;
        return i;
 }
@@ -223,6 +226,10 @@
                msg = RMTconnect(&conn, &peers[i].uri, &peers[i].usr, 
&peers[i].pwd);
                if ( msg == MAL_SUCCEED )
                        peers[i].conn = GDKstrdup(conn);
+               else {
+                       *c = NULL;
+                       return msg;
+               } 
        }
 
        *c = GDKstrdup(peers[i].conn);
@@ -239,51 +246,52 @@
        char buf[BUFSIZ]= "*/octopus", *s= buf;
        int i;
 
-       if (nrworkers ==MAXSITES) 
-               throw(MAL,"octopus.discover",OPERATION_FAILED "No octopus 
worker left");
-
        nrworkers = 0;
+       octopusLocal = 0;
        for (i=0; i<nrpeers; i++)
                peers[i].active = 0;
 
        msg = RMTresolve(&bid,&s);
-       if ( msg != MAL_SUCCEED){
-               /* there is a last resort, yourself */
-               
+       if ( msg == MAL_SUCCEED) {
+               b = BATdescriptor(bid);
+               if ( b != NULL && BATcount(b) > 0 ) {
+                       bi = bat_iterator(b);
+                       BATloop(b,p,q){
+                               str t= (str) BUNtail(bi,p);
+
+                               workers[nrworkers].pnum = OCTOPUSgetPeer(t); 
/*ref to peers registry*/
+                               snprintf(buf,BUFSIZ,"worker_%d",nrworkers);
+                               workers[nrworkers].name = GDKstrdup(buf);
+
+#ifdef DEBUG_RUN_OCTOPUS
+                               stream_printf(cntxt->fdout,"Worker site %d 
%s\n", nrworkers, t);
+#endif
+                               nrworkers++;
+                       }
+               }
+               BBPreleaseref(bid);
+       }
+
+       if ( !nrworkers  ) {
+               /* there is a last resort, local execution */
                SABAOTHgetLocalConnection(&s);
+       
                workers[nrworkers].pnum = OCTOPUSgetPeer(s); /*ref to peers 
registry*/
                snprintf(buf,BUFSIZ,"worker_%d",nrworkers);
                workers[nrworkers].name = GDKstrdup(buf);
        
 #ifdef DEBUG_RUN_OCTOPUS
                stream_printf(cntxt->fdout,"Worker site %d %s\n", nrworkers, s);
+#endif
+               nrworkers++;
+               octopusLocal = 1;
+       }
+
+#ifdef DEBUG_RUN_OCTOPUS
+       stream_printf(cntxt->fdout,"Octopus workers %d\n",nrworkers);
 #else
                (void) cntxt;
 #endif
-               nrworkers++;
-               return MAL_SUCCEED;
-       }
-       b = BATdescriptor(bid);
-       if ( b == NULL)
-               throw(MAL,"octopus.discover",OPERATION_FAILED "No octopus 
worker list available");
-
-       bi= bat_iterator(b);
-       BATloop(b,p,q){
-               str t= (str) BUNtail(bi,p);
-
-               workers[nrworkers].pnum = OCTOPUSgetPeer(t); /*ref to peers 
registry*/
-               snprintf(buf,BUFSIZ,"worker_%d",nrworkers);
-               workers[nrworkers].name = GDKstrdup(buf);
-#ifdef DEBUG_RUN_OCTOPUS
-               stream_printf(cntxt->fdout,"Worker site %d %s\n", nrworkers, t);
-#endif
-               nrworkers++;
-       }
-#ifdef DEBUG_RUN_OCTOPUS
-               stream_printf(cntxt->fdout,"Octopus workers %d\n",nrworkers);
-#endif
-
-       BBPreleaseref(bid);
 
        for (i=0; i<nrpeers; i++)
                if ( !peers[i].active )
@@ -328,6 +336,63 @@
 The time-out parameter is not used yet.
 @c
 
+static int admitSerialConn(void *cntxt, void *mb, void *stk, void *pci)
+{
+       str dburi;
+       int i, adm = 0;
+       MalStkPtr s = (MalStkPtr) stk;
+       InstrPtr p = (InstrPtr) pci;
+
+       (void) cntxt;
+       (void) mb;
+
+       if ( strncmp (getFunctionId(p), "exec", 4) == 0 ) 
+               dburi = *(str*)getArgReference(s, p,2);
+       else dburi = *(str*)getArgReference(s, p,1);
+       i = OCTOPUSfindPeer(dburi);
+
+       MT_set_lock(s->stklock,"serialConn");
+       if ( i >= 0 && !peers[i].inuse ) {
+               adm = peers[i].inuse = 1;
+#ifdef DEBUG_RUN_OCTOPUS
+               stream_printf( ((Client)cntxt)->fdout,"Conn. to peer %d (%s) in 
use\n", i, dburi);
+               printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, 
LIST_MAL_ALL);
+#endif
+       }
+       MT_unset_lock(s->stklock,"serialConn");
+
+       return adm;
+}
+
+static int wrapupSerialConn(void *cntxt, void *mb, void *stk, void *pci)
+{
+       str dburi;
+       int i;
+       MalStkPtr s = (MalStkPtr) stk;
+       InstrPtr p = (InstrPtr) pci;
+
+       (void) cntxt;
+       (void) mb;
+
+       if ( strncmp (getFunctionId(p), "exec", 4) == 0 ) 
+               dburi = *(str*)getArgReference(s, p,2);
+       else dburi = *(str*)getArgReference(s, p,1);
+       i = OCTOPUSfindPeer(dburi);
+
+       MT_set_lock(s->stklock,"serialConn");
+       if ( i >= 0 ) {
+               peers[i].inuse = 0;
+#ifdef DEBUG_RUN_OCTOPUS
+               stream_printf( ((Client)cntxt)->fdout,"Conn to peer %d (%s) 
released\n", i, dburi);
+               printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, 
LIST_MAL_ALL);
+#endif
+       }
+       MT_unset_lock(s->stklock,"serialConn");
+
+       return 0;
+}
+
+
 @-
 Discover available workers and register tentacles on them
 scheduler.register():bit;
@@ -383,7 +448,13 @@
        
        /* Register tentacle functions at peers */
        stop = j;
-       msg = runMALdataflow(cntxt,mb,start,stop,stk,0,pci);
+       if ( !octopusLocal ){   /*skip registration for local execution*/
+               stk->admit = &admitSerialConn;
+               stk->wrapup = &wrapupSerialConn;
+               msg = runMALdataflow(cntxt,mb,start,stop,stk,0,pci);
+               stk->admit = NULL;
+               stk->wrapup = NULL;
+       }
        *res = 0;
        return msg;
 }
@@ -432,8 +503,10 @@
 OCTOPUSbidding(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
     bit *res = (bit*) getArgReference(stk,pci,0);
-    int j, start;
+    int j, start, k;
     InstrPtr p;
+       lng *lbid;
+       str *wname;
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to