Changeset: a38b3256e2b4 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a38b3256e2b4 Added Files: monetdb5/optimizer/opt_volcano.c monetdb5/optimizer/opt_volcano.h Modified Files: clients/Tests/MAL-signatures.stable.out clients/Tests/MAL-signatures.stable.out.int128 clients/Tests/exports.stable.out monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/mal/mal_resource.c monetdb5/mal/mal_resource.h monetdb5/mal/mal_runtime.c monetdb5/mal/mal_runtime.h monetdb5/modules/mal/language.mal monetdb5/optimizer/Makefile.ag monetdb5/optimizer/opt_pipes.c monetdb5/optimizer/opt_prelude.c monetdb5/optimizer/opt_prelude.h monetdb5/optimizer/opt_support.h monetdb5/optimizer/opt_wrapper.c monetdb5/optimizer/optimizer.mal sql/backends/monet5/sql.c sql/backends/monet5/sql_optimizer.c sql/test/Tests/setoptimizer.stable.err sql/test/Tests/setoptimizer.stable.out sql/test/Tests/setoptimizer.stable.out.Windows Branch: rdf Log Message:
Merge with default diffs (truncated from 656 to 300 lines): diff --git a/clients/Tests/MAL-signatures.stable.out b/clients/Tests/MAL-signatures.stable.out --- a/clients/Tests/MAL-signatures.stable.out +++ b/clients/Tests/MAL-signatures.stable.out @@ -38338,6 +38338,10 @@ command language.assert(v:sht,term:str): address MALassertSht; command language.assert(v:bit,term:str):void address MALassertBit; +pattern language.block(v:int,w:any...):int +address deblockdataflow; +comment Block on availability of all variables w, and then pass on v + pattern language.call(s:bat[:str]):void address CMDcallBAT; comment Evaluate a program stored in a BAT. @@ -39674,6 +39678,12 @@ comment Collect trace of a specific oper pattern optimizer.trace():str address OPTwrapper; +pattern optimizer.volcano(mod:str,fcn:str):str +address OPTwrapper; +comment Simulate volcano style execution + +pattern optimizer.volcano():str +address OPTwrapper; command pcre.imatch(s:str,pat:str):bit address PCREimatch; comment Caseless Perl Compatible Regular Expression pattern matching against a string diff --git a/clients/Tests/MAL-signatures.stable.out.int128 b/clients/Tests/MAL-signatures.stable.out.int128 --- a/clients/Tests/MAL-signatures.stable.out.int128 +++ b/clients/Tests/MAL-signatures.stable.out.int128 @@ -49189,6 +49189,10 @@ command language.assert(v:sht,term:str): address MALassertSht; command language.assert(v:bit,term:str):void address MALassertBit; +pattern language.block(v:int,w:any...):int +address deblockdataflow; +comment Block on availability of all variables w, and then pass on v + pattern language.call(s:bat[:str]):void address CMDcallBAT; comment Evaluate a program stored in a BAT. @@ -50533,6 +50537,12 @@ comment Collect trace of a specific oper pattern optimizer.trace():str address OPTwrapper; +pattern optimizer.volcano(mod:str,fcn:str):str +address OPTwrapper; +comment Simulate volcano style execution + +pattern optimizer.volcano():str +address OPTwrapper; command pcre.imatch(s:str,pat:str):bit address PCREimatch; comment Caseless Perl Compatible Regular Expression pattern matching against a string diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -1251,6 +1251,7 @@ str MALpass(Client cntxt, MalBlkPtr mb, str MALpipeline(Client c); str MALreader(Client c); void MALresourceFairness(lng usec); +int MALrunningThreads(void); str MALstartDataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); str MANIFOLDevaluate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); str MANIFOLDremapMultiplex(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); @@ -1540,6 +1541,7 @@ int OPTremapImplementation(Client cntxt, int OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); int OPTreorderImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); str OPTsetDebugStr(void *ret, str *nme); +int OPTvolcanoImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); str OPTwrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); str PCREilike2(bit *ret, const str *s, const str *pat); str PCREilike3(bit *ret, const str *s, const str *pat, const str *esc); @@ -1933,6 +1935,7 @@ str bindRef; str binddbatRef; str bindidxRef; var_t blobsize(size_t nitems); +str blockRef; str bpmRef; str bstreamRef; int bstream_create_wrap(Bstream *BS, Stream *S, int *bufsize); @@ -1993,6 +1996,7 @@ int daytime_fromstr(const char *buf, int int daytime_tostr(str *buf, int *len, const daytime *val); int daytime_tz_fromstr(const char *buf, int *len, daytime **ret); str dblRef; +str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); void debugFunction(stream *fd, MalBlkPtr mb, MalStkPtr stk, int flg, int first, int size); void debugLifespan(Client cntxt, MalBlkPtr mb, Lifespan span); str debugOptimizers(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); @@ -2057,6 +2061,7 @@ str generatorRef; MALfcn getAddress(stream *out, str filename, str modnme, str fcnname, int silent); str getArgDefault(MalBlkPtr mb, InstrPtr p, int idx); ptr getArgReference(MalStkPtr stk, InstrPtr pci, int k); +lng getBatSpace(BAT *b); int getBitConstant(MalBlkPtr mb, bit val); int getBlockBegin(MalBlkPtr mb, int pc); int getBlockExit(MalBlkPtr mb, int pc); diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -380,13 +380,17 @@ DFLOWworker(void *T) MT_lock_unset(&flow->flowlock); #ifdef USE_MAL_ADMISSION - if (MALadmission(fe->argclaim, fe->hotclaim)) { - fe->hotclaim = 0; /* don't assume priority anymore */ - fe->maxclaim = 0; - if (todo->last == 0) - MT_sleep_ms(DELAYUNIT); - q_requeue(todo, fe); - continue; + if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, fe->hotclaim)) { + // never block on deblockdataflow() + p= getInstrPtr(flow->mb,fe->pc); + if( p->fcn != (MALfcn) deblockdataflow){ + fe->hotclaim = 0; /* don't assume priority anymore */ + fe->maxclaim = 0; + if (todo->last == 0) + MT_sleep_ms(DELAYUNIT); + q_requeue(todo, fe); + continue; + } } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); @@ -931,6 +935,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m return msg; } +str +deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int *ret = getArgReference_int(stk,pci,0); + int *val = getArgReference_int(stk,pci,1); + (void) cntxt; + (void) mb; + *ret = *val; + return MAL_SUCCEED; +} + void stopMALdataflow(void) { diff --git a/monetdb5/mal/mal_dataflow.h b/monetdb5/mal/mal_dataflow.h --- a/monetdb5/mal/mal_dataflow.h +++ b/monetdb5/mal/mal_dataflow.h @@ -13,5 +13,6 @@ #include "mal_client.h" mal_export str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr stk); +mal_export str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); #endif /* _MAL_DATAFLOW_H*/ diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c --- a/monetdb5/mal/mal_resource.c +++ b/monetdb5/mal/mal_resource.c @@ -206,7 +206,7 @@ MALresourceFairness(lng usec) if (rss < MEMORY_THRESHOLD ) break; threads = GDKnr_threads > 0 ? GDKnr_threads : 1; - delay = (unsigned int) ( ((double)DELAYUNIT * running) / threads); + delay = (unsigned int) ( ((double)DELAYUNIT * running) / threads) + 1; if (delay) { if ( delayed++ == 0){ PARDEBUG mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory "SZFMT"[%f]\n", delay, clk, rss, MEMORY_THRESHOLD ); @@ -221,6 +221,13 @@ MALresourceFairness(lng usec) } } +// Get a hint on the parallel behavior +int +MALrunningThreads(void) +{ + return running; +} + void initResource(void) { diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h --- a/monetdb5/mal/mal_resource.h +++ b/monetdb5/mal/mal_resource.h @@ -12,7 +12,7 @@ #include "mal_interpreter.h" #define TIMESLICE 2000000 /* usec */ -#define DELAYUNIT 5 /* ms delay in parallel processing decisions */ +#define DELAYUNIT 2 /* ms delay in parallel processing decisions */ #define MAX_DELAYS 1000 /* never wait forever */ #define USE_MAL_ADMISSION @@ -22,5 +22,6 @@ mal_export int MALadmission(lng argclaim mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag); mal_export void MALresourceFairness(lng usec); +mal_export int MALrunningThreads(void); #endif /* _MAL_RESOURCE_H*/ diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c --- a/monetdb5/mal/mal_runtime.c +++ b/monetdb5/mal/mal_runtime.c @@ -22,7 +22,7 @@ #include "mal_private.h" #define heapinfo(X) ((X) && (X)->base ? (X)->free: 0) -#define hashinfo(X) (((X) && (X) != (Hash *) 1 && (X)->mask)? ((X)->mask + (X)->lim + 1) * sizeof(int) + sizeof(*(X)) + cnt * sizeof(int): 0) +#define hashinfo(X) ( (X)? heapinfo((X)->heap):0) // Keep a queue of running queries QueryQueue QRYqueue; @@ -212,6 +212,19 @@ runtimeProfileExit(Client cntxt, MalBlkP * may trigger a side effect, such as creating a hash-index. * Side effects are ignored. */ + +lng +getBatSpace(BAT *b){ + lng space=0; + if( b == NULL) + return 0; + if( b->T) space += heapinfo(&b->T->heap); + if( b->T->vheap) space += heapinfo(b->T->vheap); + if(b->T) space += hashinfo(b->T->hash); + space += IMPSimprintsize(b); + return space; +} + lng getVolume(MalStkPtr stk, InstrPtr pci, int rd) { int i, limit; diff --git a/monetdb5/mal/mal_runtime.h b/monetdb5/mal/mal_runtime.h --- a/monetdb5/mal/mal_runtime.h +++ b/monetdb5/mal/mal_runtime.h @@ -41,6 +41,7 @@ mal_export void runtimeProfileBegin(Clie mal_export void runtimeProfileExit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, RuntimeProfile prof); mal_export void finishSessionProfiler(Client cntxt); mal_export lng getVolume(MalStkPtr stk, InstrPtr pci, int rd); +mal_export lng getBatSpace(BAT *b); mal_export QueryQueue QRYqueue; #endif diff --git a/monetdb5/modules/mal/language.mal b/monetdb5/modules/mal/language.mal --- a/monetdb5/modules/mal/language.mal +++ b/monetdb5/modules/mal/language.mal @@ -51,6 +51,10 @@ pattern pass(v:any_1) address MALpass comment "Cheap instruction to disgard storage while retaining the dataflow dependency"; +pattern block(v:int,w:any...):int +address deblockdataflow +comment "Block on availability of all variables w, and then pass on v"; + pattern register(m:str,f:str,code:str,help:str):void address CMDregisterFunction comment"Compile the code string to MAL and register it as a function."; diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag --- a/monetdb5/optimizer/Makefile.ag +++ b/monetdb5/optimizer/Makefile.ag @@ -51,6 +51,7 @@ lib_optimizer = { opt_support.c opt_support.h \ opt_pushselect.c opt_pushselect.h \ opt_profiler.c opt_profiler.h \ + opt_volcano.c opt_volcano.h \ opt_wrapper.c } diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c --- a/monetdb5/optimizer/opt_pipes.c +++ b/monetdb5/optimizer/opt_pipes.c @@ -90,6 +90,37 @@ static struct PIPELINES { "optimizer.profiler();" "optimizer.garbageCollector();", "stable", NULL, NULL, 1}, +/* + * Volcano style execution produces a sequence of blocks from the source relation + */ + {"volcano_pipe", + "optimizer.inline();" + "optimizer.candidates();" + "optimizer.remap();" + "optimizer.costModel();" + "optimizer.coercions();" + "optimizer.evaluate();" + "optimizer.aliases();" + "optimizer.pushselect();" + "optimizer.mitosis();" + "optimizer.mergetable();" + "optimizer.deadcode();" + "optimizer.aliases();" + "optimizer.constants();" + "optimizer.commonTerms();" + "optimizer.projectionpath();" + "optimizer.reorder();" + "optimizer.deadcode();" + "optimizer.reduce();" + "optimizer.matpack();" + "optimizer.dataflow();" + "optimizer.querylog();" + "optimizer.multiplex();" + "optimizer.generator();" _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list