Changeset: e769960272f9 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e769960272f9
Modified Files:
monetdb5/optimizer/opt_partition.mx
Branch: default
Log Message:
Derive largest subplan
The query plan is analysed for the largest fragment that
does not require data-exchange or is used in a blocking operator.
diffs (222 lines):
diff --git a/monetdb5/optimizer/opt_partition.mx
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -627,27 +627,188 @@ remapVariable( MalBlkPtr mb, int i, int
}
/*
- * take a multi-assignement and resolve the distributed plan
- * for all right-hand arguments
+ * take a multi-assignement for a specific instruction and resolve the
distributed plan
+ * for all right-hand arguments.
+ * But first determine all instructions that might be needed afterwards and
retain them
*/
static int
OPTcollect(Client cntxt, MalBlkPtr mb, int pc, Slices *slices)
{
InstrPtr p;
int parallel;
+ Lifespan span = setLifespan(mb);
p = getInstrPtr(mb, pc);
assert(p->token == ASSIGNsymbol);
+ /* locate all variables used beyond this point but introduced before */
if( (parallel = OPTparallelcode(cntxt, mb, pc, slices) ) ) {
-/*
- q= getInstrPtr(mb,0);
- for( j = q->retc; j < q->argc; j++)
- mb->stmt[pc] = pushArgument(mb, mb->stmt[pc],
getArg(q,j));
-*/
} /* else no parallelism, just a multi-assignment */
+ GDKfree(span);
return parallel;
}
+/*
+ * The plan is analysed for the maximal subplan that involves a partitioned
table
+ * and that does not require data exchanges.
+ * This portion is extracted for possibly remote execution.
+*/
+#define BLOCKED 1
+#define REQUIRED 2
+#define EXPORTED 3
+#define NEEDED 4
+static int
+OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
+{
+ char *plan,*vars;
+ int i, j, k, limit, last;
+ InstrPtr ret, p, *old;
+ Symbol s;
+ MalBlkPtr nmb;
+ str msg;
+ char nme[BUFSIZ];
+
+ plan = GDKzalloc(mb->ssize);
+ if( plan == 0)
+ return 0;
+ vars = GDKzalloc(mb->vsize);
+ if( vars == 0){
+ GDKfree(plan);
+ return 0;
+ }
+
+ snprintf(nme,BUFSIZ,"%s_plan",getFunctionId( getInstrPtr(mb,0)));
+ s = newFunction(userRef, putName(nme, strlen(nme)),FUNCTIONsymbol);
+ if ( s == NULL)
+ return 0;
+ freeMalBlk(s->def);
+ s->def = copyMalBlk(mb);
+ nmb = s->def;
+ getFunctionId( getInstrPtr(nmb,0)) = putName(nme,strlen(nme));
+
+ limit = nmb->stop;
+ old = nmb->stmt;
+ if ( newMalBlkStmt(nmb,nmb->ssize) < 0 )
+ return 0;
+
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"#Remote plan framework\n");
+ mnstr_printf(cntxt->fdout,"#partition %s.%s.%s type %d\n",
+ slices->schema,
+ slices->table,
+ (slices->column ? slices->column: ""),
+ slices->type);
+#else
+ (void) slices;
+#endif
+
+ /* Phase 1: determine all variables/instructions indirectly dependent
on a
+ fragmented column
+ */
+ last = limit;
+ for ( i = 0; i < limit ; i++) {
+ p = old[i];
+ if ( p->token == ENDsymbol || i > last) {
+ plan[i] = REQUIRED;
+ last = i;
+ } else
+ if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef
|| getFunctionId(p) == bindidxRef) &&
+ strcmp(slices->schema, getVarConstant(mb,
getArg(p,2)).val.sval) == 0 &&
+ strcmp(slices->table, getVarConstant(mb,
getArg(p,3)).val.sval) == 0 ) {
+ vars[getArg(p,0)] = REQUIRED;
+ plan[i] = REQUIRED;
+ } else
+ /* all arguments should be free to use in distributed setting */
+ for( j = p->retc; j < p->argc; j++)
+ if (vars[getArg(p,j)] == BLOCKED)
+ plan[i] = BLOCKED;
+
+ /* blocking instructions */
+ if ( (getModuleId(p) == groupRef && (getFunctionId(p) ==
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) ) ||
+ getModuleId(p) == pqueueRef || getModuleId(p)
== aggrRef || getModuleId(p) == ioRef ||
+ (getModuleId(p) == sqlRef && (getFunctionId(p)
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
+ (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef)) ) {
+ /* add the targets of its argument to the output */
+ plan[i] = BLOCKED;
+ }
+
+ if( plan[i] == BLOCKED){
+ for ( j= 0; j< p->retc; j++)
+ vars[getArg(p,j)] = BLOCKED;
+ } else {
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == REQUIRED)
+ break;
+ if ( j != p->argc) {
+ for ( j= 0; j< p->retc; j++)
+ vars[getArg(p,j)] = REQUIRED;
+ plan[i] = REQUIRED;
+ }
+ }
+ }
+
+ /* Phase 2: determine all variables/instructions contributing */
+ mnstr_printf(cntxt->fdout,"#phase 2\n");
+ for ( i = limit -1; i >= 0 ; i--)
+ if ( plan[i] != BLOCKED ){
+ p = old[i];
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == REQUIRED)
+ plan[i] = REQUIRED;
+
+ if( plan[i] == REQUIRED)
+ for ( j= p->retc; j< p->argc; j++)
+ vars[getArg(p,j)] = REQUIRED;
+ }
+ /* Phase 3: determine all variables to be exported */
+ mnstr_printf(cntxt->fdout,"#phase 3\n");
+ ret= newInstruction(nmb,ASSIGNsymbol);
+ ret->barrier = RETURNsymbol;
+ ret->argc= ret->retc = 0;
+ for ( i = 0; i < limit ; i++)
+ if ( plan[i] != REQUIRED ){
+ p = old[i];
+ for( j = p->retc; j < p->argc; j++)
+ if (vars[getArg(p,j)] == REQUIRED &&
isaBatType(getArgType(nmb,p,j)) ) {
+ for ( k = 0; k < ret->retc; k++)
+ if (getArg(ret,k) == getArg(p,j))
+ break;
+ if ( k == ret->retc)
+ ret= pushReturn(nmb,ret, getArg(p,j));
+ }
+ }
+
+ /* Phase 4: Back a new function that produces them */
+ for ( i = 0; i < limit ; i++)
+ if( plan[i] == REQUIRED ) {
+ p = copyInstruction(getInstrPtr(mb, i));
+ if ( old[i]->token == ENDsymbol)
+ pushInstruction(nmb,ret);
+ pushInstruction(nmb,p);
+ if (getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef ||
getFunctionId(p) == bindidxRef))
+ OPTsliceColumn(cntxt, nmb, mb, p, slices, nmb->stop-1);
+ }
+
+ /* fix the signature */
+ while ( nmb->stmt[0]->retc )
+ delArgument(nmb->stmt[0],0);
+ for( i =0; i< ret->retc; i++)
+ nmb->stmt[0]= pushReturn(nmb, nmb->stmt[0], getArg(ret,i));
+
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"#Remote plan framework\n");
+ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_STMT);
+ msg= optimizeMALBlock(cntxt, nmb);
+ chkProgram(cntxt->nspace, nmb);
+ mnstr_printf(cntxt->fdout,"#partition error %d %s\n", nmb->errors,
msg?msg:"");
+ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_STMT);
+#endif
+ if ( nmb->errors == 0)
+ insertSymbol(cntxt->nspace,s);
+ GDKfree(plan);
+ GDKfree(vars);
+ return 0;
+}
+
/*
* The general tactic is to identify instructions that are blocked in a
distributed setting.
* For those instruction we inject a multi-assignment to map is arguments to
new variables
@@ -716,8 +877,10 @@ OPTpartitionImplementation(Client cntxt,
getVarConstant(mb,
getArg(slices.target,3)).val.sval,
rowcnt, nrpeers);
-
-
+ /* derive a local plan based on forward flow reasoning */
+ if( OPTplanFragment(cntxt, mb, &slices)== 0) {
+ /* bake the controller function */
+ } else
for(i=0; i < mb->stop; i++){
p= getInstrPtr(mb,i);
@@ -758,7 +921,8 @@ OPTpartitionImplementation(Client cntxt,
/* grouping and aggregation are blocking instruction for the
time being */
if ( (getModuleId(p) == groupRef && (getFunctionId(p) ==
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) ) ||
- getModuleId(p) == aggrRef ) {
+ getModuleId(p) == pqueueRef || getModuleId(p)
== aggrRef ||
+ (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef)) ) {
rsset= newInstruction(mb,ASSIGNsymbol);
for ( j = p->retc; j < p->argc; j++) {
for ( k = p->retc; k < j; k++)
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list