Changeset: a38b3256e2b4 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a38b3256e2b4
Added Files:
        monetdb5/optimizer/opt_volcano.c
        monetdb5/optimizer/opt_volcano.h
Modified Files:
        clients/Tests/MAL-signatures.stable.out
        clients/Tests/MAL-signatures.stable.out.int128
        clients/Tests/exports.stable.out
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
        monetdb5/mal/mal_runtime.c
        monetdb5/mal/mal_runtime.h
        monetdb5/modules/mal/language.mal
        monetdb5/optimizer/Makefile.ag
        monetdb5/optimizer/opt_pipes.c
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        monetdb5/optimizer/opt_support.h
        monetdb5/optimizer/opt_wrapper.c
        monetdb5/optimizer/optimizer.mal
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_optimizer.c
        sql/test/Tests/setoptimizer.stable.err
        sql/test/Tests/setoptimizer.stable.out
        sql/test/Tests/setoptimizer.stable.out.Windows
Branch: rdf
Log Message:

Merge with default


diffs (truncated from 656 to 300 lines):

diff --git a/clients/Tests/MAL-signatures.stable.out 
b/clients/Tests/MAL-signatures.stable.out
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -38338,6 +38338,10 @@ command language.assert(v:sht,term:str):
 address MALassertSht;
 command language.assert(v:bit,term:str):void 
 address MALassertBit;
+pattern language.block(v:int,w:any...):int 
+address deblockdataflow;
+comment Block on availability of all variables w, and then pass on v
+
 pattern language.call(s:bat[:str]):void 
 address CMDcallBAT;
 comment Evaluate a program stored in a BAT.
@@ -39674,6 +39678,12 @@ comment Collect trace of a specific oper
 
 pattern optimizer.trace():str 
 address OPTwrapper;
+pattern optimizer.volcano(mod:str,fcn:str):str 
+address OPTwrapper;
+comment Simulate volcano style execution
+
+pattern optimizer.volcano():str 
+address OPTwrapper;
 command pcre.imatch(s:str,pat:str):bit 
 address PCREimatch;
 comment Caseless Perl Compatible Regular Expression pattern matching against a 
string
diff --git a/clients/Tests/MAL-signatures.stable.out.int128 
b/clients/Tests/MAL-signatures.stable.out.int128
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -49189,6 +49189,10 @@ command language.assert(v:sht,term:str):
 address MALassertSht;
 command language.assert(v:bit,term:str):void 
 address MALassertBit;
+pattern language.block(v:int,w:any...):int 
+address deblockdataflow;
+comment Block on availability of all variables w, and then pass on v
+
 pattern language.call(s:bat[:str]):void 
 address CMDcallBAT;
 comment Evaluate a program stored in a BAT.
@@ -50533,6 +50537,12 @@ comment Collect trace of a specific oper
 
 pattern optimizer.trace():str 
 address OPTwrapper;
+pattern optimizer.volcano(mod:str,fcn:str):str 
+address OPTwrapper;
+comment Simulate volcano style execution
+
+pattern optimizer.volcano():str 
+address OPTwrapper;
 command pcre.imatch(s:str,pat:str):bit 
 address PCREimatch;
 comment Caseless Perl Compatible Regular Expression pattern matching against a 
string
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1251,6 +1251,7 @@ str MALpass(Client cntxt, MalBlkPtr mb, 
 str MALpipeline(Client c);
 str MALreader(Client c);
 void MALresourceFairness(lng usec);
+int MALrunningThreads(void);
 str MALstartDataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str MANIFOLDevaluate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str MANIFOLDremapMultiplex(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
p);
@@ -1540,6 +1541,7 @@ int OPTremapImplementation(Client cntxt,
 int OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 int OPTreorderImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr p);
 str OPTsetDebugStr(void *ret, str *nme);
+int OPTvolcanoImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr p);
 str OPTwrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PCREilike2(bit *ret, const str *s, const str *pat);
 str PCREilike3(bit *ret, const str *s, const str *pat, const str *esc);
@@ -1933,6 +1935,7 @@ str bindRef;
 str binddbatRef;
 str bindidxRef;
 var_t blobsize(size_t nitems);
+str blockRef;
 str bpmRef;
 str bstreamRef;
 int bstream_create_wrap(Bstream *BS, Stream *S, int *bufsize);
@@ -1993,6 +1996,7 @@ int daytime_fromstr(const char *buf, int
 int daytime_tostr(str *buf, int *len, const daytime *val);
 int daytime_tz_fromstr(const char *buf, int *len, daytime **ret);
 str dblRef;
+str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 void debugFunction(stream *fd, MalBlkPtr mb, MalStkPtr stk, int flg, int 
first, int size);
 void debugLifespan(Client cntxt, MalBlkPtr mb, Lifespan span);
 str debugOptimizers(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
@@ -2057,6 +2061,7 @@ str generatorRef;
 MALfcn getAddress(stream *out, str filename, str modnme, str fcnname, int 
silent);
 str getArgDefault(MalBlkPtr mb, InstrPtr p, int idx);
 ptr getArgReference(MalStkPtr stk, InstrPtr pci, int k);
+lng getBatSpace(BAT *b);
 int getBitConstant(MalBlkPtr mb, bit val);
 int getBlockBegin(MalBlkPtr mb, int pc);
 int getBlockExit(MalBlkPtr mb, int pc);
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -380,13 +380,17 @@ DFLOWworker(void *T)
                MT_lock_unset(&flow->flowlock);
 
 #ifdef USE_MAL_ADMISSION
-               if (MALadmission(fe->argclaim, fe->hotclaim)) {
-                       fe->hotclaim = 0;   /* don't assume priority anymore */
-                       fe->maxclaim = 0;
-                       if (todo->last == 0)
-                               MT_sleep_ms(DELAYUNIT);
-                       q_requeue(todo, fe);
-                       continue;
+               if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, 
fe->hotclaim)) {
+                       // never block on deblockdataflow()
+                       p= getInstrPtr(flow->mb,fe->pc);
+                       if( p->fcn != (MALfcn) deblockdataflow){
+                               fe->hotclaim = 0;   /* don't assume priority 
anymore */
+                               fe->maxclaim = 0;
+                               if (todo->last == 0)
+                                       MT_sleep_ms(DELAYUNIT);
+                               q_requeue(todo, fe);
+                               continue;
+                       }
                }
 #endif
                error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 
1, flow->stk, 0, 0);
@@ -931,6 +935,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        return msg;
 }
 
+str
+deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    int *ret = getArgReference_int(stk,pci,0);
+    int *val = getArgReference_int(stk,pci,1);
+    (void) cntxt;
+    (void) mb;
+    *ret = *val;
+    return MAL_SUCCEED;
+}
+
 void
 stopMALdataflow(void)
 {
diff --git a/monetdb5/mal/mal_dataflow.h b/monetdb5/mal/mal_dataflow.h
--- a/monetdb5/mal/mal_dataflow.h
+++ b/monetdb5/mal/mal_dataflow.h
@@ -13,5 +13,6 @@
 #include "mal_client.h"
 
 mal_export str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int 
stoppc, MalStkPtr stk);
+mal_export str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
 #endif /*  _MAL_DATAFLOW_H*/
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -206,7 +206,7 @@ MALresourceFairness(lng usec)
                        if (rss < MEMORY_THRESHOLD )
                                break;
                        threads = GDKnr_threads > 0 ? GDKnr_threads : 1;
-                       delay = (unsigned int) ( ((double)DELAYUNIT * running) 
/ threads);
+                       delay = (unsigned int) ( ((double)DELAYUNIT * running) 
/ threads) + 1;
                        if (delay) {
                                if ( delayed++ == 0){
                                                PARDEBUG 
mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory  "SZFMT"[%f]\n", 
delay, clk, rss, MEMORY_THRESHOLD );
@@ -221,6 +221,13 @@ MALresourceFairness(lng usec)
        }
 }
 
+// Get a hint on the parallel behavior
+int
+MALrunningThreads(void)
+{
+       return running;
+}
+
 void
 initResource(void)
 {
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -12,7 +12,7 @@
 #include "mal_interpreter.h"
 
 #define TIMESLICE  2000000 /* usec */
-#define DELAYUNIT 5 /* ms delay in parallel processing decisions */
+#define DELAYUNIT 2 /* ms delay in parallel processing decisions */
 #define MAX_DELAYS 1000 /* never wait forever */
 
 #define USE_MAL_ADMISSION
@@ -22,5 +22,6 @@ mal_export int MALadmission(lng argclaim
 
 mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int 
i, int flag);
 mal_export void MALresourceFairness(lng usec);
+mal_export int MALrunningThreads(void);
 
 #endif /*  _MAL_RESOURCE_H*/
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -22,7 +22,7 @@
 #include "mal_private.h"
 
 #define heapinfo(X) ((X) && (X)->base ? (X)->free: 0)
-#define hashinfo(X) (((X) && (X) != (Hash *) 1 && (X)->mask)? ((X)->mask + 
(X)->lim + 1) * sizeof(int) + sizeof(*(X)) + cnt * sizeof(int):  0)
+#define hashinfo(X) ( (X)? heapinfo((X)->heap):0)
 
 // Keep a queue of running queries
 QueryQueue QRYqueue;
@@ -212,6 +212,19 @@ runtimeProfileExit(Client cntxt, MalBlkP
  * may trigger a side effect, such as creating a hash-index.
  * Side effects are ignored.
  */
+
+lng
+getBatSpace(BAT *b){
+       lng space=0;
+       if( b == NULL)
+               return 0;
+       if( b->T) space += heapinfo(&b->T->heap); 
+       if( b->T->vheap) space += heapinfo(b->T->vheap); 
+       if(b->T) space += hashinfo(b->T->hash); 
+       space += IMPSimprintsize(b);
+       return space;
+}
+
 lng getVolume(MalStkPtr stk, InstrPtr pci, int rd)
 {
        int i, limit;
diff --git a/monetdb5/mal/mal_runtime.h b/monetdb5/mal/mal_runtime.h
--- a/monetdb5/mal/mal_runtime.h
+++ b/monetdb5/mal/mal_runtime.h
@@ -41,6 +41,7 @@ mal_export void runtimeProfileBegin(Clie
 mal_export void runtimeProfileExit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci, RuntimeProfile prof);
 mal_export void finishSessionProfiler(Client cntxt);
 mal_export lng getVolume(MalStkPtr stk, InstrPtr pci, int rd);
+mal_export lng getBatSpace(BAT *b);
 
 mal_export QueryQueue QRYqueue;
 #endif
diff --git a/monetdb5/modules/mal/language.mal 
b/monetdb5/modules/mal/language.mal
--- a/monetdb5/modules/mal/language.mal
+++ b/monetdb5/modules/mal/language.mal
@@ -51,6 +51,10 @@ pattern pass(v:any_1)
 address MALpass
 comment "Cheap instruction to disgard storage while retaining the dataflow 
dependency";
 
+pattern block(v:int,w:any...):int
+address deblockdataflow
+comment "Block on availability of all variables w, and then pass on v";
+
 pattern register(m:str,f:str,code:str,help:str):void
 address CMDregisterFunction
 comment"Compile the code string to MAL and register it as a function.";
diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag
--- a/monetdb5/optimizer/Makefile.ag
+++ b/monetdb5/optimizer/Makefile.ag
@@ -51,6 +51,7 @@ lib_optimizer = {
                opt_support.c opt_support.h \
                opt_pushselect.c opt_pushselect.h \
                opt_profiler.c opt_profiler.h \
+               opt_volcano.c opt_volcano.h \
                opt_wrapper.c
 }
 
diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c
--- a/monetdb5/optimizer/opt_pipes.c
+++ b/monetdb5/optimizer/opt_pipes.c
@@ -90,6 +90,37 @@ static struct PIPELINES {
         "optimizer.profiler();"
         "optimizer.garbageCollector();",
         "stable", NULL, NULL, 1},
+/*
+ * Volcano style execution produces a sequence of blocks from the source 
relation
+ */
+       {"volcano_pipe",
+        "optimizer.inline();"
+        "optimizer.candidates();"
+        "optimizer.remap();"
+        "optimizer.costModel();"
+        "optimizer.coercions();"
+        "optimizer.evaluate();"
+        "optimizer.aliases();"
+        "optimizer.pushselect();"
+        "optimizer.mitosis();"
+        "optimizer.mergetable();"
+        "optimizer.deadcode();"
+        "optimizer.aliases();"
+        "optimizer.constants();"
+        "optimizer.commonTerms();"
+        "optimizer.projectionpath();"
+        "optimizer.reorder();"
+        "optimizer.deadcode();"
+        "optimizer.reduce();"
+        "optimizer.matpack();"
+        "optimizer.dataflow();"
+        "optimizer.querylog();"
+        "optimizer.multiplex();"
+        "optimizer.generator();"
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to