Changeset: 4ab74024a766 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4ab74024a766 Added Files: sql/backends/monet5/iot/Tests/export00.sql sql/backends/monet5/iot/Tests/export00.stable.err sql/backends/monet5/iot/Tests/export00.stable.out Modified Files: sql/backends/monet5/iot/50_iot.sql sql/backends/monet5/iot/Tests/All sql/backends/monet5/iot/basket.c sql/backends/monet5/iot/basket.h sql/backends/monet5/iot/iot.mal sql/backends/monet5/iot/petrinet.c Branch: iot Log Message:
Added simple export basket function diffs (288 lines): diff --git a/sql/backends/monet5/iot/50_iot.sql b/sql/backends/monet5/iot/50_iot.sql --- a/sql/backends/monet5/iot/50_iot.sql +++ b/sql/backends/monet5/iot/50_iot.sql @@ -60,6 +60,9 @@ create procedure iot.period(n integer) create procedure iot.import("schema" string, "table" string, dirpath string) external name iot.import; +create procedure iot.export("schema" string, "table" string, dirpath string) + external name iot.export; + -- input/output places create procedure iot.receptor("schema" string, "table" string, dir string) external name iot.receptor; diff --git a/sql/backends/monet5/iot/Tests/All b/sql/backends/monet5/iot/Tests/All --- a/sql/backends/monet5/iot/Tests/All +++ b/sql/backends/monet5/iot/Tests/All @@ -9,3 +9,4 @@ receptor01 #petrinet00 webtest inputoutput +export00 diff --git a/sql/backends/monet5/iot/Tests/export00.sql b/sql/backends/monet5/iot/Tests/export00.sql new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/iot/Tests/export00.sql @@ -0,0 +1,17 @@ +-- A simple continuous query. +set schema iot; +set optimizer='iot_pipe'; + +create stream table tempsout (t timestamp, sensor integer, val decimal(8,2)) ; + +insert into tempsout values('2005-09-23 12:34:26.736',1,12.34); +insert into tempsout values('2005-09-23 12:34:26.736',1,12.34); +insert into tempsout values('2005-09-23 12:34:26.736',1,12.34); +select * from tempsout; + +declare basketdir string; +set basketdir= '/ufs/mk/baskets/measures/temperatures/'; + +call iot.export('iot','tempsout', concat(basketdir,'20')); + +drop table tempsout; diff --git a/sql/backends/monet5/iot/Tests/export00.stable.err b/sql/backends/monet5/iot/Tests/export00.stable.err new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/iot/Tests/export00.stable.err @@ -0,0 +1,34 @@ +stderr of test 'export00` in directory 'sql/backends/monet5/iot` itself: + + +# 15:30:46 > +# 15:30:46 > "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" "mapi_open=true" "--set" "mapi_port=35125" "--set" "mapi_usock=/var/tmp/mtest-21110/.s.monetdb.35125" "--set" "monet_prompt=" "--forcemito" "--dbpath=/export/scratch1/mk/iot//Linux/var/MonetDB/mTests_sql_backends_monet5_iot" +# 15:30:46 > + +# builtin opt gdk_dbpath = /export/scratch1/mk/iot//Linux/var/monetdb5/dbfarm/demo +# builtin opt gdk_debug = 0 +# builtin opt gdk_vmtrim = no +# builtin opt monet_prompt = > +# builtin opt monet_daemon = no +# builtin opt mapi_port = 50000 +# builtin opt mapi_open = false +# builtin opt mapi_autosense = false +# builtin opt sql_optimizer = default_pipe +# builtin opt sql_debug = 0 +# cmdline opt gdk_nr_threads = 0 +# cmdline opt mapi_open = true +# cmdline opt mapi_port = 35125 +# cmdline opt mapi_usock = /var/tmp/mtest-21110/.s.monetdb.35125 +# cmdline opt monet_prompt = +# cmdline opt gdk_dbpath = /export/scratch1/mk/iot//Linux/var/MonetDB/mTests_sql_backends_monet5_iot +# cmdline opt gdk_debug = 536870922 + +# 15:30:47 > +# 15:30:47 > "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" "--host=/var/tmp/mtest-21110" "--port=35125" +# 15:30:47 > + + +# 15:30:47 > +# 15:30:47 > "Done." +# 15:30:47 > + diff --git a/sql/backends/monet5/iot/Tests/export00.stable.out b/sql/backends/monet5/iot/Tests/export00.stable.out new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/iot/Tests/export00.stable.out @@ -0,0 +1,52 @@ +stdout of test 'export00` in directory 'sql/backends/monet5/iot` itself: + + +# 15:30:46 > +# 15:30:46 > "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" "mapi_open=true" "--set" "mapi_port=35125" "--set" "mapi_usock=/var/tmp/mtest-21110/.s.monetdb.35125" "--set" "monet_prompt=" "--forcemito" "--dbpath=/export/scratch1/mk/iot//Linux/var/MonetDB/mTests_sql_backends_monet5_iot" +# 15:30:46 > + +# MonetDB 5 server v11.24.0 +# This is an unreleased version +# Serving database 'mTests_sql_backends_monet5_iot', using 8 threads +# Compiled for x86_64-unknown-linux-gnu/64bit with 64bit OIDs and 128bit integers dynamically linked +# Found 15.589 GiB available main-memory. +# Copyright (c) 1993-July 2008 CWI. +# Copyright (c) August 2008-2016 MonetDB B.V., all rights reserved +# Visit http://www.monetdb.org/ for further information +# Listening for connection requests on mapi:monetdb://vienna.da.cwi.nl:35125/ +# Listening for UNIX domain connection requests on mapi:monetdb:///var/tmp/mtest-21110/.s.monetdb.35125 +# MonetDB/GIS module loaded +# MonetDB/SQL module loaded +# MonetDB/iot loaded + +Ready. + +# 15:30:47 > +# 15:30:47 > "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" "--host=/var/tmp/mtest-21110" "--port=35125" +# 15:30:47 > + +#set schema iot; +#set optimizer='iot_pipe'; +#create stream table tempsout (t timestamp, sensor integer, val decimal(8,2)) ; +#insert into tempsout values('2005-09-23 12:34:26.736',1,12.34); +[ 1 ] +#insert into tempsout values('2005-09-23 12:34:26.736',1,12.34); +[ 1 ] +#insert into tempsout values('2005-09-23 12:34:26.736',1,12.34); +[ 1 ] +#select * from tempsout; +% iot.tempsout, iot.tempsout, iot.tempsout # table_name +% t, sensor, val # name +% timestamp, int, decimal # type +% 26, 1, 10 # length +[ 2005-09-23 12:34:26.736000, 1, 12.34 ] +[ 2005-09-23 12:34:26.736000, 1, 12.34 ] +[ 2005-09-23 12:34:26.736000, 1, 12.34 ] +#declare basketdir string; +#set basketdir= '/ufs/mk/baskets/measures/temperatures/'; +#drop table tempsout; + +# 15:30:47 > +# 15:30:47 > "Done." +# 15:30:47 > + diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c --- a/sql/backends/monet5/iot/basket.c +++ b/sql/backends/monet5/iot/basket.c @@ -504,6 +504,108 @@ BSKTimport(Client cntxt, MalBlkPtr mb, M return BSKTimportInternal(cntxt,bskt); } +static str +BSKTexportInternal(Client cntxt, int bskt) +{ + char buf[PATHLENGTH]; + BAT *b; + int i; + str msg= MAL_SUCCEED; + FILE *f; + long fsize; + str dir = baskets[bskt].source; + str cname= NULL; + + (void)cntxt; + // check access permission to directory first + if( access (dir , F_OK | R_OK)){ + throw(SQL, "iot.basket", "Could not access the basket directory %s. error %d",dir,errno); + } + + /* check for leftover files */ + for( i=0; i < MAXCOLS && baskets[bskt].cols[i]; i++){ + cname = baskets[bskt].cols[i]; + snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, cname); + _DEBUG_BASKET_ mnstr_printf(BSKTout,"Check for the file %s\n",buf); + if( !access (buf,R_OK)) + throw(MAL,"iot.export","Left over %s file %s\n",cname, buf); + b = baskets[bskt].bats[i]; + if( b == 0) + throw(MAL,"iot.export","Could not access the column %s\n",cname); + } + + // types are already checked during stream initialization + MT_lock_set(&iotLock); + for( i=0; i < MAXCOLS && baskets[bskt].cols[i]; i++){ + cname = baskets[bskt].cols[i]; + snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, cname); + _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf); + f= fopen(buf,"w"); + if( f == NULL){ + msg= createException(MAL,"iot.export","Could not access the column %s file %s\n",cname, buf); + break; + } + b = baskets[bskt].bats[i]; + assert( b); + + switch(ATOMstorage(b->ttype)){ + case TYPE_bit: + case TYPE_bte: + case TYPE_sht: + case TYPE_int: + case TYPE_void: + case TYPE_oid: + case TYPE_flt: + case TYPE_dbl: + case TYPE_lng: +#ifdef HAVE_HGE + case TYPE_hge: +#endif + /* append the binary partition */ + fsize = BATcount(b) * ATOMsize(b->ttype); + if( fwrite(Tloc(b, BUNlast(b)),1,fsize, f) != (size_t) fsize){ + (void) fclose(f); + msg= createException(MAL,"iot.export","Could not write complete basket file %s\n",baskets[bskt].cols[i]); + goto recover; + } + break; + case TYPE_str: + msg= createException(MAL,"iot.export","Export type string not yet supported\n"); + break; + default: + msg= createException(MAL,"iot.export","export type not yet supported\n"); + } + (void) fclose(f); + } + + /* reset all BATs when they are exported */ + for( i=0; i < MAXCOLS && baskets[bskt].cols[i]; i++){ + b = baskets[bskt].bats[i]; + assert( b ); + BATsetcount(b,0); + } + +recover: + MT_lock_unset(&iotLock); + return msg; +} + +str +BSKTexport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str sch = *getArgReference_str(stk, pci, 1); + str tbl = *getArgReference_str(stk, pci, 2); + str dir = *getArgReference_str(stk, pci, 3); + int bskt; + + BSKTregisterInternal(cntxt, mb, sch, tbl); + bskt = BSKTlocate(sch,tbl); + if (bskt == 0) + throw(SQL, "iot.basket", "Could not find the basket %s.%s",sch,tbl); + baskets[bskt].source = GDKstrdup(dir); + return BSKTexportInternal(cntxt,bskt); +} + /* remove tuples from a basket according to the sliding policy */ #define ColumnShift(B,TPE, STRIDE) { \ TPE *first= (TPE*) Tloc(B, BUNfirst(B));\ diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h --- a/sql/backends/monet5/iot/basket.h +++ b/sql/backends/monet5/iot/basket.h @@ -93,6 +93,7 @@ iot_export str BSKTappend(Client cntxt, iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTimportInternal(Client cntxt, int bskt); iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); +iot_export str BSKTexport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); iot_export str BSKTunlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); diff --git a/sql/backends/monet5/iot/iot.mal b/sql/backends/monet5/iot/iot.mal --- a/sql/backends/monet5/iot/iot.mal +++ b/sql/backends/monet5/iot/iot.mal @@ -77,6 +77,10 @@ pattern iot.import(sch:str, tbl:str, dir address BSKTimport comment "Import a single directory with the binary files for a stream table"; +pattern iot.export(sch:str, tbl:str, dir:str):void +address BSKTexport +comment "Export a stream table to binary files; + pattern iot.baskets()(sch:bat[:str],nme:bat[:str], status:bat[:str], threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int], timestride:bat[:int], heartbeat:bat[:int], seen:bat[:timestamp], events:bat[:int]) address BSKTtable diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c --- a/sql/backends/monet5/iot/petrinet.c +++ b/sql/backends/monet5/iot/petrinet.c @@ -84,7 +84,7 @@ int pnettop = 0; int enabled[MAXPN]; /*array that contains the id's of all queries that are enable to fire*/ static int pnstatus = PNINIT; -static int cycleDelay = 500; /* be careful, it affects response/throughput timings */ +static int cycleDelay = 50; /* be careful, it affects response/throughput timings */ str PNperiod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list