Changeset: 849d25275eca for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=849d25275eca Modified Files: MonetDB5/src/mal/mal_interpreter.mx MonetDB5/src/mal/mal_recycle.mx MonetDB5/src/mal/mal_stack.mx MonetDB5/src/optimizer/opt_octopus.mx MonetDB5/src/scheduler/run_octopus.mx Branch: default Log Message:
1) Fixed local execution with octopus; 2) Refined scheduling of parallel blocks using stack hooks. In particular, serialize calls to the same peer inside of a parallel block since the same TCP/IP connection is used. diffs (truncated from 342 to 300 lines): diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/mal/mal_interpreter.mx --- a/MonetDB5/src/mal/mal_interpreter.mx Mon Jul 05 16:27:13 2010 +0200 +++ b/MonetDB5/src/mal/mal_interpreter.mx Mon Jul 05 17:25:47 2010 +0200 @@ -1349,10 +1349,10 @@ for(i = limit-1; i >= pc ; i--) if (fs[i].status == DFLOWpending ) { p = getInstrPtr(flow->mb, fs[i].pc); + if(flow->stk->admit == 0 || (*flow->stk->admit)(flow->cntxt, flow->mb, flow->stk, p) ) + for ( j= p->retc; j < p->argc; j++) - if ( getArg(p,j)== oa && DFLOWeligible(flow,fs,i,p,pc) && - (f == 0 || f->flow->stk->admit == 0 || (*f->flow->stk->admit)(f->flow->cntxt, f->flow->mb, f->flow->stk, getInstrPtr(flow->mb, f->pc)) ) - ){ + if ( getArg(p,j)== oa && DFLOWeligible(flow,fs,i,p,pc) ){ queued++; candidates ++; DFLOWactivate(flow,fs,i,p); diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/mal/mal_recycle.mx --- a/MonetDB5/src/mal/mal_recycle.mx Mon Jul 05 16:27:13 2010 +0200 +++ b/MonetDB5/src/mal/mal_recycle.mx Mon Jul 05 17:25:47 2010 +0200 @@ -1037,7 +1037,7 @@ lng cacheLimit; QryStatPtr qsp = recycleQPat->ptrn[cntxt->rcc->curQ]; - (void) rd; +/* (void) rd; */ RECYCLEspace(); cacheLimit = recycleCacheLimit?recycleCacheLimit:HARDLIMIT_STMT; if ( recycleSize >= cacheLimit) @@ -1094,10 +1094,11 @@ /* use the field to refer to the query-owner index in the query pattern table */ pushInstruction(recycleBlk,q); i = recycleBlk->stop-1; - recycleBlk->profiler[i].rbytes = recycleBlk->profiler[i].clk = GDKusec(); +/* recycleBlk->profiler[i].rbytes = recycleBlk->profiler[i].clk = GDKusec(); */ + recycleBlk->profiler[i].clk = GDKusec(); recycleBlk->profiler[i].counter =1; recycleBlk->profiler[i].ticks = ticks; -/* recycleBlk->profiler[i].rbytes = rd; */ + recycleBlk->profiler[i].rbytes = rd; recycleBlk->profiler[i].wbytes = wr; recyclerUsedMemory += wr; if (monitorRecycler == 1 ) diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/mal/mal_stack.mx --- a/MonetDB5/src/mal/mal_stack.mx Mon Jul 05 16:27:13 2010 +0200 +++ b/MonetDB5/src/mal/mal_stack.mx Mon Jul 05 17:25:47 2010 +0200 @@ -69,6 +69,7 @@ #define MAXSHARES 8 typedef str (*MALfcn) (); +typedef int (*DFhook) (void *, void *, void *, void *); typedef struct MALSTK { int stksize; @@ -83,8 +84,8 @@ there may be different schemes to take instructions into execution. The admission scheme (and wrapup) are the necessary scheduler hooks. @h - MALfcn admit; - MALfcn wrapup; + DFhook admit; + DFhook wrapup; MT_Lock stklock; /* used for parallel processing */ @- It is handy to administer the timing in the stack frame diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/optimizer/opt_octopus.mx --- a/MonetDB5/src/optimizer/opt_octopus.mx Mon Jul 05 16:27:13 2010 +0200 +++ b/MonetDB5/src/optimizer/opt_octopus.mx Mon Jul 05 17:25:47 2010 +0200 @@ -237,8 +237,9 @@ pushArgument(mb, p, varid); p->barrier = RETURNsymbol; pushEndInstruction(mb); - newStmt(mb, optimizerRef, putName("aliases", 7)); - newStmt(mb, optimizerRef, putName("deadcode", 8)); + p = newStmt(mb, optimizerRef, putName("aliases", 7)); + p = pushStr(mb, p, octopusRef); + p = pushStr(mb, p, getFunctionId(getInstrPtr(mb, 0))); } static MalBlkPtr diff -r 1436cc5f795a -r 849d25275eca MonetDB5/src/scheduler/run_octopus.mx --- a/MonetDB5/src/scheduler/run_octopus.mx Mon Jul 05 16:27:13 2010 +0200 +++ b/MonetDB5/src/scheduler/run_octopus.mx Mon Jul 05 17:25:47 2010 +0200 @@ -88,7 +88,7 @@ #include "mal_instruction.h" #include "mal_client.h" -#define DEBUG_RUN_OCTOPUS /* to trace processing */ +/*#define DEBUG_RUN_OCTOPUS to trace processing */ #ifdef WIN32 #ifndef LIBRUN_OCTOPUS @@ -133,6 +133,7 @@ Registry nxt; /* list of registered mal functions */ bte active; str conn; + int inuse; } Peer; typedef struct { @@ -150,6 +151,7 @@ static Worker workers[MAXSITES]; /* registry of workers for the current query */ static int nrworkers = 0; static int nrpeers=0; +static bte octopusLocal=0; bte optTarget = 1; /*sht bidStrategy = 1; #define BID_TRANS 1 @@ -185,7 +187,8 @@ peers[i].uri = GDKstrdup(uri); peers[i].pwd = GDKstrdup("monetdb"); peers[i].active = 1; - peers[i].nxt = NULL; + peers[i].nxt = NULL; + peers[i].inuse = 0; nrpeers++; return i; } @@ -223,6 +226,10 @@ msg = RMTconnect(&conn, &peers[i].uri, &peers[i].usr, &peers[i].pwd); if ( msg == MAL_SUCCEED ) peers[i].conn = GDKstrdup(conn); + else { + *c = NULL; + return msg; + } } *c = GDKstrdup(peers[i].conn); @@ -239,51 +246,52 @@ char buf[BUFSIZ]= "*/octopus", *s= buf; int i; - if (nrworkers ==MAXSITES) - throw(MAL,"octopus.discover",OPERATION_FAILED "No octopus worker left"); - nrworkers = 0; + octopusLocal = 0; for (i=0; i<nrpeers; i++) peers[i].active = 0; msg = RMTresolve(&bid,&s); - if ( msg != MAL_SUCCEED){ - /* there is a last resort, yourself */ - + if ( msg == MAL_SUCCEED) { + b = BATdescriptor(bid); + if ( b != NULL && BATcount(b) > 0 ) { + bi = bat_iterator(b); + BATloop(b,p,q){ + str t= (str) BUNtail(bi,p); + + workers[nrworkers].pnum = OCTOPUSgetPeer(t); /*ref to peers registry*/ + snprintf(buf,BUFSIZ,"worker_%d",nrworkers); + workers[nrworkers].name = GDKstrdup(buf); + +#ifdef DEBUG_RUN_OCTOPUS + stream_printf(cntxt->fdout,"Worker site %d %s\n", nrworkers, t); +#endif + nrworkers++; + } + } + BBPreleaseref(bid); + } + + if ( !nrworkers ) { + /* there is a last resort, local execution */ SABAOTHgetLocalConnection(&s); + workers[nrworkers].pnum = OCTOPUSgetPeer(s); /*ref to peers registry*/ snprintf(buf,BUFSIZ,"worker_%d",nrworkers); workers[nrworkers].name = GDKstrdup(buf); #ifdef DEBUG_RUN_OCTOPUS stream_printf(cntxt->fdout,"Worker site %d %s\n", nrworkers, s); +#endif + nrworkers++; + octopusLocal = 1; + } + +#ifdef DEBUG_RUN_OCTOPUS + stream_printf(cntxt->fdout,"Octopus workers %d\n",nrworkers); #else (void) cntxt; #endif - nrworkers++; - return MAL_SUCCEED; - } - b = BATdescriptor(bid); - if ( b == NULL) - throw(MAL,"octopus.discover",OPERATION_FAILED "No octopus worker list available"); - - bi= bat_iterator(b); - BATloop(b,p,q){ - str t= (str) BUNtail(bi,p); - - workers[nrworkers].pnum = OCTOPUSgetPeer(t); /*ref to peers registry*/ - snprintf(buf,BUFSIZ,"worker_%d",nrworkers); - workers[nrworkers].name = GDKstrdup(buf); -#ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"Worker site %d %s\n", nrworkers, t); -#endif - nrworkers++; - } -#ifdef DEBUG_RUN_OCTOPUS - stream_printf(cntxt->fdout,"Octopus workers %d\n",nrworkers); -#endif - - BBPreleaseref(bid); for (i=0; i<nrpeers; i++) if ( !peers[i].active ) @@ -328,6 +336,63 @@ The time-out parameter is not used yet. @c +static int admitSerialConn(void *cntxt, void *mb, void *stk, void *pci) +{ + str dburi; + int i, adm = 0; + MalStkPtr s = (MalStkPtr) stk; + InstrPtr p = (InstrPtr) pci; + + (void) cntxt; + (void) mb; + + if ( strncmp (getFunctionId(p), "exec", 4) == 0 ) + dburi = *(str*)getArgReference(s, p,2); + else dburi = *(str*)getArgReference(s, p,1); + i = OCTOPUSfindPeer(dburi); + + MT_set_lock(s->stklock,"serialConn"); + if ( i >= 0 && !peers[i].inuse ) { + adm = peers[i].inuse = 1; +#ifdef DEBUG_RUN_OCTOPUS + stream_printf( ((Client)cntxt)->fdout,"Conn. to peer %d (%s) in use\n", i, dburi); + printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, LIST_MAL_ALL); +#endif + } + MT_unset_lock(s->stklock,"serialConn"); + + return adm; +} + +static int wrapupSerialConn(void *cntxt, void *mb, void *stk, void *pci) +{ + str dburi; + int i; + MalStkPtr s = (MalStkPtr) stk; + InstrPtr p = (InstrPtr) pci; + + (void) cntxt; + (void) mb; + + if ( strncmp (getFunctionId(p), "exec", 4) == 0 ) + dburi = *(str*)getArgReference(s, p,2); + else dburi = *(str*)getArgReference(s, p,1); + i = OCTOPUSfindPeer(dburi); + + MT_set_lock(s->stklock,"serialConn"); + if ( i >= 0 ) { + peers[i].inuse = 0; +#ifdef DEBUG_RUN_OCTOPUS + stream_printf( ((Client)cntxt)->fdout,"Conn to peer %d (%s) released\n", i, dburi); + printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, LIST_MAL_ALL); +#endif + } + MT_unset_lock(s->stklock,"serialConn"); + + return 0; +} + + @- Discover available workers and register tentacles on them scheduler.register():bit; @@ -383,7 +448,13 @@ /* Register tentacle functions at peers */ stop = j; - msg = runMALdataflow(cntxt,mb,start,stop,stk,0,pci); + if ( !octopusLocal ){ /*skip registration for local execution*/ + stk->admit = &admitSerialConn; + stk->wrapup = &wrapupSerialConn; + msg = runMALdataflow(cntxt,mb,start,stop,stk,0,pci); + stk->admit = NULL; + stk->wrapup = NULL; + } *res = 0; return msg; } @@ -432,8 +503,10 @@ OCTOPUSbidding(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { bit *res = (bit*) getArgReference(stk,pci,0); - int j, start; + int j, start, k; InstrPtr p; + lng *lbid; + str *wname; _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list