It was this check in who broke the compilation in SQL.
Martin forgot to remove the calls for
OCTOPUSdrop

 From sql/src/backend/monet5/sql.mx

Romulo

Martin Kersten wrote:
> Update of /cvsroot/monetdb/MonetDB5/src/scheduler
> In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv21275
> 
> Modified Files:
>       run_octopus.mx 
> Log Message:
> A new round for the octopus scheduler. Code still in testing phase.
> 
> 
> U run_octopus.mx
> Index: run_octopus.mx
> ===================================================================
> RCS file: /cvsroot/monetdb/MonetDB5/src/scheduler/run_octopus.mx,v
> retrieving revision 1.17
> retrieving revision 1.18
> diff -u -d -r1.17 -r1.18
> --- run_octopus.mx    29 Jan 2009 18:17:10 -0000      1.17
> +++ run_octopus.mx    13 Apr 2009 14:23:01 -0000      1.18
> @@ -30,17 +30,17 @@
>  re-directing requests to multiple sites. If there are no sites known,
>  then the code is executed linearly as is.
>  
> -The scheduler runs all tentacles asynchronously.
> +The scheduler runs all tentacles asynchronously if possible.
>  To make our live easier, we assume that all tentacles are
>  grouped together in a guarded block as follows:
>  
>  @verbatim
> -barrier (parallel,a):= scheduler.octopus(timeout);
> -a:= octopus.tentacle_1();
> +barrier (parallel,version):= scheduler.octopus(timeout);
> +a:= octopus.tentacle_1(sitename,fcnname,version);
>  ...
> -b:= octopus.tentacle_n();
> -a:= mat.pack(a,...,b);
> -exit (parallel,a);
> +b:= octopus.tentacle_n(sitename,fcnname,version);
> +exit (parallel,version);
> +z:= mat.pack(a,...,b);
>  @end verbatim
>  
>  This way the MAL flow of control simplifies skipping to the end
> @@ -50,27 +50,17 @@
>  Allowing MAL instructions inbetween complicates our work,
>  because it would mean that we have to do a flow analysis.
>  
> -To make this work the scheduler needs a list of database worker.
> -For the time being, this is an explicitly administered list here. 
> -When the octopus scheduling is called, we check the connection with
> -the remote site. If it is down, it is re-activated using Merovingian.
> -
> +To make this work the scheduler needs a list of databases to play with.
> +For the time being this consists of all the database known
> +and ending with the phrase 'sea'.
> +This list is obtained through the remote module using the
> +support of Merovingian. The default is to use the local
> +database as a target.
>  @{
>  @mal
> -pattern scheduler.octopus(timeout:int)(:bit, :bat[:any_1,:any_2])
> +pattern scheduler.octopus(t:int)(:bit,version:int)
>  address OCTOPUSrun
> -comment "Run the program block in parallel, but don't wait longer then t 
> seconds";
> -
> -pattern scheduler.worker(dbnme:str, usr:str, pw:str)
> -address OCTOPUSworker
> -comment "Add a new worker to the known list ";
> -pattern scheduler.worker(dbnme:str, usr:str, pw:str, host:str, port:int)
> -address OCTOPUSworker
> -comment "Add a worker site to the known list ";
> -
> -pattern scheduler.drop(dbnme:str)
> -address OCTOPUSdrop
> -comment "Remove a worker from the list";
> +comment "Run the program block in parallel, but don't wait longer then t 
> seconds. Also fix a consistent database version.";
>  @h
>  #ifndef _RUN_OCTOPUS
>  #define _RUN_OCTOPUS
> @@ -78,7 +68,7 @@
>  #include "mal_instruction.h"
>  #include "mal_client.h"
>  
> -/*#define DEBUG_RUN_OCTOPUS          to trace processing */
> +#define DEBUG_RUN_OCTOPUS            /* to trace processing */
>  
>  #ifdef WIN32
>  #ifndef LIBRUN_OCTOPUS
> @@ -91,8 +81,6 @@
>  #endif
>  
>  octopus_export str OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
> InstrPtr p);
> -octopus_export str OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
> InstrPtr p);
> -octopus_export str OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
> InstrPtr p);
>  #endif /* MAL_RUN_OCTOPUS */
>  
>  @+ Octopus scheduling implementation
> @@ -108,155 +96,156 @@
>  #include "remote.h"
>  #include "alarm.h"
>  
> -#define SITEasleep   0
> -#define SITElocal    1
> -#define SITEremote   2
> +typedef struct REGMAL{
> +     str fcn;
> +     struct REGMAL *nxt;
> +} *Registry;
>  
>  typedef struct {
> -     str alias;
> -     str db; /* connection parameters */
> +     str name;
>       str usr;
> -     str pw;
> -     str host;       /* used when merovigian is not running */
> -     int port;
> -     int status;     /* asleep, local, remote */
> -} Site;
> +     str pwd;
> +     Registry nxt;   /* list of registered queries */
> +} Sea;
>  
>  #define MAXSITES 2048        /* should become dynamic at some point */
> -static Site *sites;
> -static int nrsites = 0;
> +static Sea sea[MAXSITES];
> +static int nrsea = 0;
>  
>  static str
>  OCTOPUSdiscover(Client cntxt){
>       bat b1 = 0, b2 = 0;
> -     BAT *b;
> +     BAT *l1, *l2;
> +     BUN p,q;
>       str msg = MAL_SUCCEED;
> +     BATiter bi;
>  
> -     (void) cntxt;
> -     (void) b2;
> +     sea[nrsea].usr = GDKstrdup("monetdb");
> +     sea[nrsea].pwd = GDKstrdup("monetdb");
> +     sea[nrsea++].name= GDKstrdup(GDKgetenv("gdk_dbname"));
>       /* determine if sites are reachable */
>       msg = RMTgetList(&b1,&b2);
>       if ( msg != MAL_SUCCEED)
>               return msg;
> -     b = BATdescriptor(b1);
> -     if ( b == NULL)
> +     l1 = BATdescriptor(b1);
> +     if ( l1 == NULL)
>               throw(MAL,"octopus.discover","No database list available");
> -     BBPunfix(b1);
> +     l2 = BATdescriptor(b2);
> +     if ( l2 == NULL){
> +             BBPreleaseref(b1);
> +             throw(MAL,"octopus.discover","No database list available");
> +     }
> +     /* add the databases to the working set */
> +     bi= bat_iterator(l1);
> +     BATloop(l1,p,q){
> +             str t= (str) BUNtail(bi,p);
> +
> +             if (nrsea ==MAXSITES) break;
> +             if (strlen(t) >= 3 && strcmp("sea", t+strlen(t)-3) == 0){
> +                     sea[nrsea].usr = GDKstrdup("monetdb");
> +                     sea[nrsea].pwd = GDKstrdup("monetdb");
> +                     sea[nrsea++].name= GDKstrdup(t);
> +#ifdef DEBUG_RUN_OCTOPUS
> +             stream_printf(cntxt->fdout,"Found site %s\n",t);
> +#else
> +             (void) cntxt;
> +#endif
> +             }
> +     }
> +#ifdef DEBUG_RUN_OCTOPUS
> +             stream_printf(cntxt->fdout,"Seas %d\n",nrsea);
> +#endif
> +     BBPreleaseref(b1);
> +     BBPreleaseref(b2);
>       return MAL_SUCCEED;
>  }
>  
>  @-
> -The replica is identified by database name. The host and port
> -should address a merovingian to ensure the database instance is
> -started. The default is to contact the local merovingian at 
> -default port 50000.
> +We first register the tentacle at all sites and keep
> +a list of those already sent.
>  @c
> -str
> -OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> +static int
> +OCTOPUSfind(Sea s, str qry){
> +     Registry r;
> +     for ( r= s.nxt; r; r= r->nxt)
> +     if ( strcmp(qry, r->fcn)==0)
> +             return 1;
> +     return 0;
> +}
> +
> +...@-
> +The functions called by the octopus.exec_qry are to be
> +registered at all sites.
> +...@c
> +static str
> +OCTOPUSregister(Client cntxt, MalBlkPtr mb, InstrPtr p)
>  {
> -     int idx;
> +     int i;
> +     str conn, fname, msg = MAL_SUCCEED;
> +
> +     fname = getVarConstant(mb,getArg(p,2)).val.sval;
> +     for ( i= 0; i< nrsea; i++){
> +             msg= RMTconnect(&conn, &sea[i].name, &sea[i].usr, &sea[i].pwd);
> +             if (msg ){
> +                     stream_printf(cntxt->fdout,"!%s\n",msg);
> +                     GDKfree(msg);
> +                     msg = NULL;
> +                     continue;
> +             }
> +             if( !OCTOPUSfind(sea[i], fname) ){
> +                     msg = RMTregisterInternal(cntxt, conn, octopusRef, 
> fname);
>  
> -     (void) mb;
> -     if (nrsites == MAXSITES)
> -             throw(MAL,"scheduler.worker","Too many worker");
> -     mal_set_lock(mal_contextLock,"scheduler.worker");
> -     if (nrsites == 0)
> -             sites = (Site *) GDKzalloc(sizeof(Site) * MAXSITES);
> -     idx = nrsites++;
> -     sites[idx].alias = NULL;
> -     sites[idx].db = GDKstrdup(*(str*) getArgReference(stk,pci,1));
> -     sites[idx].usr = GDKstrdup(*(str*) getArgReference(stk,pci,2));
> -     sites[idx].pw = GDKstrdup(*(str*) getArgReference(stk,pci,3));
> -     if (pci->argc > 4){
> -             sites[idx].host = GDKstrdup(*(str*) getArgReference(stk,pci,4));
> -             sites[idx].port = *(int*) getArgReference(stk,pci,5);
> -     } else {
> -             sites[idx].host = GDKstrdup("localhost");
> -             sites[idx].port = 50000;
> -     }
> -     mal_unset_lock(mal_contextLock,"scheduler.worker");
>  #ifdef DEBUG_RUN_OCTOPUS
> -     stream_printf(cntxt->fdout,"# added worker %s %s %s %s\n", 
> -             sites[idx].alias, sites[idx].usr, sites[idx].pw);
> -             sites[idx].db, sites[idx].usr, sites[idx].pw);
> +                     stream_printf(cntxt->fdout,"octopus.%s registered at 
> site %s\n",
> +                             fname,sea[i].name);
> +                     stream_printf(cntxt->fdout,"reply: %s\n",msg?msg:"ok");
>  #else
> -     (void) cntxt;
> +                     (void) cntxt;
>  #endif
> -     return MAL_SUCCEED;
> -}
> -str
> -OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> -{
> -     int i,j;
> -     str alias = *(str*) getArgReference(stk,pci,1);
> -
> -     (void) cntxt;
> -     (void) mb;
> -     mal_set_lock(mal_contextLock,"scheduler.drop");
> -     for (i=j=0; i<nrsites; i++){
> -             if( strcmp(sites[i].alias, alias) ==0) {
> -                     GDKfree(sites[i].alias);
> -                     GDKfree(sites[i].db);
> -                     GDKfree(sites[i].usr);
> -                     GDKfree(sites[i].pw);
> -                     GDKfree(sites[i].host);
> -                     continue;
> +                     if ( msg == MAL_SUCCEED){
> +                             Registry r= (Registry) GDKzalloc(sizeof(struct 
> REGMAL));
> +                             r->fcn = GDKstrdup(getFunctionId(p));
> +                             r->nxt = sea[i].nxt;
> +                             sea[i].nxt = r;
> +                     }
>               }
> -             sites[j++] = sites[i];
>       }
> -     nrsites = j;
> -     mal_unset_lock(mal_contextLock,"scheduler.drop");
> -     if ( i == j )
> -             throw(MAL,"scheduler.drop","Site not found");
> -     return MAL_SUCCEED;
> +     GDKfree(conn);
> +     return msg;
>  }
>  @-
> -The policy to check for sites is a multiphase phase process.
> -First, we try to re-use a site where the operation was ran before.
> -If not available, we select a non-used worker.
> -If all this fails, we pick a random site to execute the plan.
> +The work division looks at the system opportunities and
> +replaces the target site in all instructions.
> +The first policy is to simply perform round robin.
> +The more advanced way is to negotiat with the remote sites.
>  @c
>  static str
> -OCTOPUSexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> +OCTOPUSworkdivision(Client cntxt, MalBlkPtr mb, int pc)
>  {
> -     int i=0, tries= nrsites * 2;
> +     static int rr=0;
>       str msg = MAL_SUCCEED;
> +     InstrPtr p;
> +     ValPtr cst;
>  
> -redo:
> +     for (; pc< mb->stop; pc++){
> +             if ( nrsea >1 && rr == 0) rr++; /* ignore default */
> +             p= getInstrPtr(mb,pc);
> +             if ( p->barrier == EXITsymbol)
> +                     break;
> +             assert( isVarConstant(mb, getArg(p,1)) );
> +             cst = &getVarConstant(mb, getArg(p,1));
> +             if( cst->val.sval)
> +                     GDKfree(cst->val.sval);
> +             cst->val.sval= GDKstrdup(sea[rr].name);
> +             cst->len = strlen(cst->val.sval);
>  #ifdef DEBUG_RUN_OCTOPUS
> -     stream_printf(cntxt->fdout,"octopus.exec site selected %d\n",i);
> +             stream_printf(cntxt->fdout,"octopus site selected 
> %s\n",sea[rr].name);
> +             printInstruction(cntxt->fdout,mb,0,p,LIST_MAL_STMT);
>  #else
> -     (void) cntxt;
> -#endif
> -
> -     /* register the plan remotely */
> -     msg = RMTregisterInternal(cntxt, sites[i].alias, 
> -             getModuleId(pci), getFunctionId(pci));
> -
> -     /* ignore a duplicate definition */
> -     if (msg != MAL_SUCCEED && !strstr(msg,"Function already defined")){
> -#ifdef DEBUG_RUN_OCTOPUS
> -             stream_printf(cntxt->fdout,"octopus.exec failed to register 
> plan %s.%s at site %s\n",getModuleId(pci),getFunctionId(pci),sites[i].alias);
> -             stream_printf(cntxt->fdout,"reply: %s\n",msg);
> -#endif
> -             if (--tries <= 0)
> -                     return msg;
> -             goto redo;
> -     }
> -
> -     /* execute the plan as an independent process thread if it is local*/
> -     /* otherwise activate it on the remote site passing parameters as well 
> */
> -     msg =runMALprocess(cntxt,mb,stk, getPC(mb,pci), getPC(mb,pci)+1);
> -     if ( msg != MAL_SUCCEED){
> -#ifdef DEBUG_RUN_OCTOPUS
> -             stream_printf(cntxt->fdout,"octopus.exec failed to run remote 
> plan\n");
> +             (void) cntxt;
>  #endif
> -             if (--tries <= 0)
> -                     return msg;
> -             goto redo;
> +             rr= (rr+1) % nrsea;
>       }
> -
> -     /* if it fails, we need to find another site */
>       return msg;
>  }
>  @-
> @@ -270,37 +259,52 @@
>  We should be careful in accessing a site that runs out
>  of clients or any failure. It may cause the scheduler to
>  wait forever.
> +
> +The database version should indicate to the tentacles
> +if it is time to refresh their caches. 
> +It should be obtained from the recycler where we
> +know when updates have been taken place.
>  @c
>  str
>  OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
>  {
>       int *res = (int*) getArgReference(stk,p,0);
> +     int *version = (int*) getArgReference(stk,p,1);
>       int timeout = *(int*) getArgReference(stk,p,2);
> -     int j,fnd, i = getPC(mb,p);
> +     int j,fnd, i = getPC(mb,p), threadcnt=0;
>       str msg = MAL_SUCCEED;
>       *res = 0;       /* skip the block */
>  
> -     if ( OCTOPUSdiscover(cntxt) == 0 ){
> +     *version = 0;
> +
> +     if ( (msg= OCTOPUSdiscover(cntxt))  ){
>  #ifdef DEBUG_RUN_OCTOPUS
>               stream_printf(cntxt->fdout,"#Run in local serial mode\n");
>  #endif
>               *res = 1;
> -             return MAL_SUCCEED; 
> +             return msg; 
> +     }
> +...@-
> +Register the tentacle functions at all sites.
> +...@c
> +     if (nrsea > 1) {
> +             for (j= i+1; j<mb->stop ; j++){
> +                     p= getInstrPtr(mb,j);
> +                     if ( p->barrier == EXITsymbol )
> +                             break;
> +                     msg= OCTOPUSregister(cntxt,mb,p);
> +                     if ( msg )
> +                             return msg;
> +             }
>       }
> +     msg= OCTOPUSworkdivision(cntxt,mb,i+1);
> +     if ( msg )
> +             return msg;
> +
>       /* do the actual parallel work */
>       for (i++; i<mb->stop && msg == MAL_SUCCEED; i++){
>               p= getInstrPtr(mb,i);
> -             /* don't do it remote if we need arguments */
> -             if ( p->retc != p->argc){
> -#ifdef DEBUG_RUN_OCTOPUS
> -             stream_printf(cntxt->fdout,"#Run in local serial mode due to 
> arguments\n");
> -#endif
> -                     *res = 1;
> -                     return MAL_SUCCEED; 
> -             }
> -             if ( p->barrier == EXITsymbol )
> -                     break;
> -             if ( getModuleId(p) == matRef && getFunctionId(p) == packRef){
> +             if ( p->barrier == EXITsymbol ){
>                       /* collect the results */
>                       do{
>                               fnd = 0;
> @@ -311,10 +315,10 @@
>  #endif
>                               MT_sleep_ms(1000);
>                               timeout--;
> -                     } while ( fnd < p->argc-3 && timeout > 0 );
> +                     } while ( fnd < threadcnt && timeout > 0 );
>                       if (timeout <= 0)
> -                             throw(MAL,"scheduler.pack","Execution time 
> out");
> -                     return MATpackInternal(stk,p,1);
> +                             throw(MAL,"scheduler.octopus","Execution time 
> out");
> +                     break;
>               }
>               if ( getModuleId(p) != octopusRef)
>                       throw(MAL,"scheduler.octopus","tentacle expected");
> @@ -323,7 +327,8 @@
>  #else
>       (void) cntxt;
>  #endif
> -             msg = OCTOPUSexec(cntxt,mb,stk,p);
> +             msg =runMALprocess(cntxt,mb,stk, getPC(mb,p), getPC(mb,p)+1);
> +             threadcnt++;
>       }
>       return msg; 
>  }
> 
> 
> ------------------------------------------------------------------------------
> This SF.net email is sponsored by:
> High Quality Requirements in a Collaborative Environment.
> Download a free trial of Rational Requirements Composer Now!
> http://p.sf.net/sfu/www-ibm-com
> _______________________________________________
> Monetdb-checkins mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/monetdb-checkins

------------------------------------------------------------------------------
This SF.net email is sponsored by:
High Quality Requirements in a Collaborative Environment.
Download a free trial of Rational Requirements Composer Now!
http://p.sf.net/sfu/www-ibm-com
_______________________________________________
Monetdb-developers mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-developers

Reply via email to