Changeset: e769960272f9 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e769960272f9
Modified Files:
        monetdb5/optimizer/opt_partition.mx
Branch: default
Log Message:

Derive largest subplan
The query plan is analysed for the largest fragment that
does not require data-exchange or is used in a blocking operator.


diffs (222 lines):

diff --git a/monetdb5/optimizer/opt_partition.mx 
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -627,27 +627,188 @@ remapVariable( MalBlkPtr mb, int i, int 
 }
 
 /* 
- * take a multi-assignement and resolve the distributed plan
- * for all right-hand arguments
+ * take a multi-assignement for a specific instruction and resolve the 
distributed plan
+ * for all right-hand arguments.
+ * But first determine all instructions that might be needed afterwards and 
retain them
 */
 static int
 OPTcollect(Client cntxt, MalBlkPtr mb, int pc, Slices *slices)
 {
        InstrPtr p;
        int  parallel;
+       Lifespan span = setLifespan(mb);
 
        p = getInstrPtr(mb, pc);
        assert(p->token == ASSIGNsymbol);
+       /* locate all variables used beyond this point but introduced before */
        if( (parallel = OPTparallelcode(cntxt, mb, pc, slices) ) ) {
-/*
-               q= getInstrPtr(mb,0);
-               for( j = q->retc; j < q->argc; j++)
-                       mb->stmt[pc] = pushArgument(mb, mb->stmt[pc], 
getArg(q,j));
-*/
        } /* else no parallelism, just a multi-assignment */
+       GDKfree(span);
        return parallel;
 } 
 
+/* 
+ * The plan is analysed for the maximal subplan that involves a partitioned 
table
+ * and that does not require data exchanges.
+ * This portion is extracted for possibly remote execution.
+*/
+#define BLOCKED 1
+#define REQUIRED 2
+#define EXPORTED 3
+#define NEEDED 4
+static int 
+OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
+{
+       char *plan,*vars;
+       int i, j, k, limit, last;
+       InstrPtr ret, p, *old;
+       Symbol s;
+       MalBlkPtr nmb;
+       str msg;
+       char nme[BUFSIZ];
+
+       plan = GDKzalloc(mb->ssize);
+       if( plan == 0)
+               return 0;
+       vars = GDKzalloc(mb->vsize);
+       if( vars == 0){
+               GDKfree(plan);
+               return 0;
+       }
+
+       snprintf(nme,BUFSIZ,"%s_plan",getFunctionId( getInstrPtr(mb,0)));
+       s = newFunction(userRef, putName(nme, strlen(nme)),FUNCTIONsymbol);
+       if ( s == NULL)
+               return 0;
+       freeMalBlk(s->def);
+       s->def = copyMalBlk(mb);
+       nmb = s->def;
+       getFunctionId( getInstrPtr(nmb,0)) = putName(nme,strlen(nme));
+
+       limit = nmb->stop;
+       old = nmb->stmt;
+       if ( newMalBlkStmt(nmb,nmb->ssize) < 0 )
+               return 0;
+
+#ifdef _DEBUG_OPT_PARTITION_
+       mnstr_printf(cntxt->fdout,"#Remote plan framework\n");
+       mnstr_printf(cntxt->fdout,"#partition %s.%s.%s type %d\n",
+               slices->schema,
+               slices->table,
+               (slices->column ? slices->column: ""),
+               slices->type);
+#else
+       (void) slices;
+#endif
+
+       /* Phase 1: determine all variables/instructions indirectly dependent 
on a
+          fragmented column
+       */
+       last = limit;
+       for ( i = 0; i < limit ; i++) {
+               p = old[i];
+               if ( p->token == ENDsymbol || i > last) {
+                       plan[i] = REQUIRED;
+                       last = i;
+               } else
+               if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef 
|| getFunctionId(p) == bindidxRef) &&
+                       strcmp(slices->schema, getVarConstant(mb, 
getArg(p,2)).val.sval) == 0 &&
+                       strcmp(slices->table, getVarConstant(mb, 
getArg(p,3)).val.sval) == 0 ) {
+                       vars[getArg(p,0)] = REQUIRED;
+                       plan[i] = REQUIRED;
+               } else
+               /* all arguments should be free to use in distributed setting */
+               for( j = p->retc; j < p->argc; j++)
+               if (vars[getArg(p,j)] == BLOCKED) 
+                       plan[i] = BLOCKED;
+
+               /* blocking instructions */
+               if (    (getModuleId(p) == groupRef && (getFunctionId(p) == 
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) )  ||
+                               getModuleId(p) == pqueueRef || getModuleId(p) 
== aggrRef || getModuleId(p) == ioRef ||
+                               (getModuleId(p) == sqlRef && (getFunctionId(p) 
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
+                               (getModuleId(p) == algebraRef 
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef)) )  {
+                       /* add the targets of its argument to the output */
+                       plan[i] = BLOCKED;
+               }
+
+               if( plan[i] == BLOCKED){
+                       for ( j= 0; j< p->retc; j++)
+                               vars[getArg(p,j)] = BLOCKED;
+               } else {
+                       for( j = 0; j < p->argc; j++)
+                       if (vars[getArg(p,j)] == REQUIRED) 
+                               break;
+                       if ( j != p->argc) {
+                               for ( j= 0; j< p->retc; j++)
+                                       vars[getArg(p,j)] = REQUIRED;
+                               plan[i] = REQUIRED;
+                       }
+               }
+       }
+
+       /* Phase 2: determine all variables/instructions contributing */
+       mnstr_printf(cntxt->fdout,"#phase 2\n");
+       for ( i = limit -1; i >= 0 ; i--)
+       if ( plan[i] != BLOCKED ){
+               p = old[i];
+               for( j = 0; j < p->argc; j++)
+               if (vars[getArg(p,j)] == REQUIRED) 
+                       plan[i] = REQUIRED;
+
+               if( plan[i] == REQUIRED)
+                       for ( j= p->retc; j< p->argc; j++)
+                               vars[getArg(p,j)] = REQUIRED;
+       }
+       /* Phase 3: determine all variables to be exported */
+       mnstr_printf(cntxt->fdout,"#phase 3\n");
+       ret= newInstruction(nmb,ASSIGNsymbol);
+       ret->barrier = RETURNsymbol;
+       ret->argc= ret->retc = 0;
+       for ( i = 0; i < limit ; i++)
+       if ( plan[i] != REQUIRED ){
+               p = old[i];
+               for( j = p->retc; j < p->argc; j++)
+               if (vars[getArg(p,j)] == REQUIRED && 
isaBatType(getArgType(nmb,p,j)) ) {
+                       for ( k = 0; k < ret->retc; k++)
+                       if (getArg(ret,k) == getArg(p,j))
+                               break;
+                       if ( k == ret->retc) 
+                               ret= pushReturn(nmb,ret, getArg(p,j));
+               }
+       }
+
+       /* Phase 4: Back a new function that produces them */
+       for ( i = 0; i < limit ; i++) 
+       if( plan[i] == REQUIRED ) {
+               p = copyInstruction(getInstrPtr(mb, i));
+               if ( old[i]->token == ENDsymbol)
+                       pushInstruction(nmb,ret);
+               pushInstruction(nmb,p);
+               if (getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef || 
getFunctionId(p) == bindidxRef))  
+                       OPTsliceColumn(cntxt, nmb, mb, p, slices, nmb->stop-1);
+       }
+
+       /* fix the signature */
+       while ( nmb->stmt[0]->retc )
+               delArgument(nmb->stmt[0],0);
+       for( i =0; i< ret->retc; i++)
+               nmb->stmt[0]= pushReturn(nmb, nmb->stmt[0], getArg(ret,i));
+
+#ifdef _DEBUG_OPT_PARTITION_
+       mnstr_printf(cntxt->fdout,"#Remote plan framework\n");
+       printFunction(cntxt->fdout, nmb, 0, LIST_MAL_STMT);
+       msg= optimizeMALBlock(cntxt, nmb);
+       chkProgram(cntxt->nspace, nmb);
+       mnstr_printf(cntxt->fdout,"#partition error %d %s\n", nmb->errors, 
msg?msg:"");
+       printFunction(cntxt->fdout, nmb, 0, LIST_MAL_STMT);
+#endif
+       if ( nmb->errors == 0)
+               insertSymbol(cntxt->nspace,s);
+       GDKfree(plan);
+       GDKfree(vars);
+       return 0;
+}
+
 /*
  * The general tactic is to identify instructions that are blocked in a 
distributed setting.
  * For those instruction we inject a multi-assignment to map is arguments to 
new variables
@@ -716,8 +877,10 @@ OPTpartitionImplementation(Client cntxt,
                                getVarConstant(mb, 
getArg(slices.target,3)).val.sval,
                                rowcnt, nrpeers);
 
-
-
+       /* derive a local plan based on forward flow reasoning */
+       if( OPTplanFragment(cntxt, mb, &slices)== 0) {
+               /* bake the controller function */
+       } else
        for(i=0; i < mb->stop; i++){
                p= getInstrPtr(mb,i);
 
@@ -758,7 +921,8 @@ OPTpartitionImplementation(Client cntxt,
 
                /* grouping and aggregation are blocking instruction for the 
time being */
                if (  (getModuleId(p) == groupRef && (getFunctionId(p) == 
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) )  ||
-                               getModuleId(p) == aggrRef ) {
+                               getModuleId(p) == pqueueRef || getModuleId(p) 
== aggrRef ||
+                               (getModuleId(p) == algebraRef 
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef)) ) {
                        rsset= newInstruction(mb,ASSIGNsymbol);
                        for ( j = p->retc; j < p->argc; j++)  {
                                for ( k = p->retc; k < j; k++)
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to