Changeset: 4ab74024a766 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4ab74024a766
Added Files:
        sql/backends/monet5/iot/Tests/export00.sql
        sql/backends/monet5/iot/Tests/export00.stable.err
        sql/backends/monet5/iot/Tests/export00.stable.out
Modified Files:
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/Tests/All
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/iot.mal
        sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:

Added simple export basket function


diffs (288 lines):

diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -60,6 +60,9 @@ create procedure iot.period(n integer)
 create procedure iot.import("schema" string, "table" string, dirpath string)
        external name iot.import;
 
+create procedure iot.export("schema" string, "table" string, dirpath string)
+       external name iot.export;
+
 -- input/output places
 create procedure iot.receptor("schema" string, "table" string, dir string)
        external name iot.receptor;
diff --git a/sql/backends/monet5/iot/Tests/All 
b/sql/backends/monet5/iot/Tests/All
--- a/sql/backends/monet5/iot/Tests/All
+++ b/sql/backends/monet5/iot/Tests/All
@@ -9,3 +9,4 @@ receptor01
 #petrinet00
 webtest
 inputoutput
+export00
diff --git a/sql/backends/monet5/iot/Tests/export00.sql 
b/sql/backends/monet5/iot/Tests/export00.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/iot/Tests/export00.sql
@@ -0,0 +1,17 @@
+-- A simple continuous query.
+set schema iot;
+set optimizer='iot_pipe';
+
+create stream table tempsout (t timestamp, sensor integer, val decimal(8,2)) ;
+
+insert into tempsout values('2005-09-23 12:34:26.736',1,12.34);
+insert into tempsout values('2005-09-23 12:34:26.736',1,12.34);
+insert into tempsout values('2005-09-23 12:34:26.736',1,12.34);
+select * from tempsout;
+
+declare basketdir string;
+set basketdir= '/ufs/mk/baskets/measures/temperatures/';
+
+call iot.export('iot','tempsout', concat(basketdir,'20'));
+
+drop table tempsout;
diff --git a/sql/backends/monet5/iot/Tests/export00.stable.err 
b/sql/backends/monet5/iot/Tests/export00.stable.err
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/iot/Tests/export00.stable.err
@@ -0,0 +1,34 @@
+stderr of test 'export00` in directory 'sql/backends/monet5/iot` itself:
+
+
+# 15:30:46 >  
+# 15:30:46 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=35125" "--set" 
"mapi_usock=/var/tmp/mtest-21110/.s.monetdb.35125" "--set" "monet_prompt=" 
"--forcemito" 
"--dbpath=/export/scratch1/mk/iot//Linux/var/MonetDB/mTests_sql_backends_monet5_iot"
+# 15:30:46 >  
+
+# builtin opt  gdk_dbpath = 
/export/scratch1/mk/iot//Linux/var/monetdb5/dbfarm/demo
+# builtin opt  gdk_debug = 0
+# builtin opt  gdk_vmtrim = no
+# builtin opt  monet_prompt = >
+# builtin opt  monet_daemon = no
+# builtin opt  mapi_port = 50000
+# builtin opt  mapi_open = false
+# builtin opt  mapi_autosense = false
+# builtin opt  sql_optimizer = default_pipe
+# builtin opt  sql_debug = 0
+# cmdline opt  gdk_nr_threads = 0
+# cmdline opt  mapi_open = true
+# cmdline opt  mapi_port = 35125
+# cmdline opt  mapi_usock = /var/tmp/mtest-21110/.s.monetdb.35125
+# cmdline opt  monet_prompt = 
+# cmdline opt  gdk_dbpath = 
/export/scratch1/mk/iot//Linux/var/MonetDB/mTests_sql_backends_monet5_iot
+# cmdline opt  gdk_debug = 536870922
+
+# 15:30:47 >  
+# 15:30:47 >  "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" 
"--host=/var/tmp/mtest-21110" "--port=35125"
+# 15:30:47 >  
+
+
+# 15:30:47 >  
+# 15:30:47 >  "Done."
+# 15:30:47 >  
+
diff --git a/sql/backends/monet5/iot/Tests/export00.stable.out 
b/sql/backends/monet5/iot/Tests/export00.stable.out
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/iot/Tests/export00.stable.out
@@ -0,0 +1,52 @@
+stdout of test 'export00` in directory 'sql/backends/monet5/iot` itself:
+
+
+# 15:30:46 >  
+# 15:30:46 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=35125" "--set" 
"mapi_usock=/var/tmp/mtest-21110/.s.monetdb.35125" "--set" "monet_prompt=" 
"--forcemito" 
"--dbpath=/export/scratch1/mk/iot//Linux/var/MonetDB/mTests_sql_backends_monet5_iot"
+# 15:30:46 >  
+
+# MonetDB 5 server v11.24.0
+# This is an unreleased version
+# Serving database 'mTests_sql_backends_monet5_iot', using 8 threads
+# Compiled for x86_64-unknown-linux-gnu/64bit with 64bit OIDs and 128bit 
integers dynamically linked
+# Found 15.589 GiB available main-memory.
+# Copyright (c) 1993-July 2008 CWI.
+# Copyright (c) August 2008-2016 MonetDB B.V., all rights reserved
+# Visit http://www.monetdb.org/ for further information
+# Listening for connection requests on mapi:monetdb://vienna.da.cwi.nl:35125/
+# Listening for UNIX domain connection requests on 
mapi:monetdb:///var/tmp/mtest-21110/.s.monetdb.35125
+# MonetDB/GIS module loaded
+# MonetDB/SQL module loaded
+# MonetDB/iot loaded
+
+Ready.
+
+# 15:30:47 >  
+# 15:30:47 >  "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" 
"--host=/var/tmp/mtest-21110" "--port=35125"
+# 15:30:47 >  
+
+#set schema iot;
+#set optimizer='iot_pipe';
+#create stream table tempsout (t timestamp, sensor integer, val decimal(8,2)) ;
+#insert into tempsout values('2005-09-23 12:34:26.736',1,12.34);
+[ 1    ]
+#insert into tempsout values('2005-09-23 12:34:26.736',1,12.34);
+[ 1    ]
+#insert into tempsout values('2005-09-23 12:34:26.736',1,12.34);
+[ 1    ]
+#select * from tempsout;
+% iot.tempsout,        iot.tempsout,   iot.tempsout # table_name
+% t,   sensor, val # name
+% timestamp,   int,    decimal # type
+% 26,  1,      10 # length
+[ 2005-09-23 12:34:26.736000,  1,      12.34   ]
+[ 2005-09-23 12:34:26.736000,  1,      12.34   ]
+[ 2005-09-23 12:34:26.736000,  1,      12.34   ]
+#declare basketdir string;
+#set basketdir= '/ufs/mk/baskets/measures/temperatures/';
+#drop table tempsout;
+
+# 15:30:47 >  
+# 15:30:47 >  "Done."
+# 15:30:47 >  
+
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -504,6 +504,108 @@ BSKTimport(Client cntxt, MalBlkPtr mb, M
        return BSKTimportInternal(cntxt,bskt);
 }
 
+static str
+BSKTexportInternal(Client cntxt, int bskt)
+{
+       char buf[PATHLENGTH];
+       BAT *b;
+       int i;
+       str msg= MAL_SUCCEED;
+       FILE *f;
+       long fsize;
+       str dir = baskets[bskt].source;
+       str cname= NULL;
+
+       (void)cntxt;
+       // check access permission to directory first
+       if( access (dir , F_OK | R_OK)){
+               throw(SQL, "iot.basket", "Could not access the basket directory 
%s. error %d",dir,errno);
+       }
+       
+       /* check for leftover files */
+       for( i=0; i < MAXCOLS && baskets[bskt].cols[i]; i++){
+               cname = baskets[bskt].cols[i];
+               snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, cname);
+               _DEBUG_BASKET_ mnstr_printf(BSKTout,"Check for the file 
%s\n",buf);
+               if( !access (buf,R_OK))
+                       throw(MAL,"iot.export","Left over %s file %s\n",cname, 
buf);
+               b = baskets[bskt].bats[i];
+               if( b == 0)
+                       throw(MAL,"iot.export","Could not access the column 
%s\n",cname);
+       }
+
+       // types are already checked during stream initialization
+       MT_lock_set(&iotLock);
+       for( i=0; i < MAXCOLS && baskets[bskt].cols[i]; i++){
+               cname = baskets[bskt].cols[i];
+               snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, cname);
+               _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
+               f=  fopen(buf,"w");
+               if( f == NULL){
+                       msg= createException(MAL,"iot.export","Could not access 
the column %s file %s\n",cname, buf);
+                       break;
+               }
+               b = baskets[bskt].bats[i];
+               assert( b);
+
+               switch(ATOMstorage(b->ttype)){
+               case TYPE_bit:
+               case TYPE_bte:
+               case TYPE_sht:
+               case TYPE_int:
+               case TYPE_void:
+               case TYPE_oid:
+               case TYPE_flt:
+               case TYPE_dbl:
+               case TYPE_lng:
+#ifdef HAVE_HGE
+               case TYPE_hge:
+#endif
+                       /* append the binary partition */
+                       fsize = BATcount(b) * ATOMsize(b->ttype);
+                       if( fwrite(Tloc(b, BUNlast(b)),1,fsize, f) != (size_t) 
fsize){
+                               (void) fclose(f);
+                               msg= createException(MAL,"iot.export","Could 
not write complete basket file %s\n",baskets[bskt].cols[i]);
+                               goto recover;
+                       }
+               break;
+               case TYPE_str:
+                       msg= createException(MAL,"iot.export","Export type 
string not yet supported\n");
+                       break;
+               default:
+                       msg= createException(MAL,"iot.export","export type not 
yet supported\n");
+               }
+               (void) fclose(f);
+       }
+
+       /* reset all BATs when they are exported */
+       for( i=0; i < MAXCOLS && baskets[bskt].cols[i]; i++){
+               b = baskets[bskt].bats[i];
+               assert( b );
+               BATsetcount(b,0);
+       }
+
+recover:
+       MT_lock_unset(&iotLock);
+    return msg;
+}
+
+str 
+BSKTexport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    str sch = *getArgReference_str(stk, pci, 1);
+    str tbl = *getArgReference_str(stk, pci, 2);
+    str dir = *getArgReference_str(stk, pci, 3);
+    int bskt;
+
+       BSKTregisterInternal(cntxt, mb, sch, tbl);
+    bskt = BSKTlocate(sch,tbl);
+       if (bskt == 0)
+               throw(SQL, "iot.basket", "Could not find the basket 
%s.%s",sch,tbl);
+       baskets[bskt].source = GDKstrdup(dir);
+       return BSKTexportInternal(cntxt,bskt);
+}
+
 /* remove tuples from a basket according to the sliding policy */
 #define ColumnShift(B,TPE, STRIDE) { \
        TPE *first= (TPE*) Tloc(B, BUNfirst(B));\
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -93,6 +93,7 @@ iot_export str BSKTappend(Client cntxt, 
 iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTimportInternal(Client cntxt, int bskt);
 iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+iot_export str BSKTexport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTunlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/sql/backends/monet5/iot/iot.mal b/sql/backends/monet5/iot/iot.mal
--- a/sql/backends/monet5/iot/iot.mal
+++ b/sql/backends/monet5/iot/iot.mal
@@ -77,6 +77,10 @@ pattern iot.import(sch:str, tbl:str, dir
 address BSKTimport
 comment "Import a single directory with the binary files for a stream table";
 
+pattern iot.export(sch:str, tbl:str, dir:str):void
+address BSKTexport
+comment "Export a stream table to binary files;
+
 pattern iot.baskets()(sch:bat[:str],nme:bat[:str], status:bat[:str], 
threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], 
timeslice:bat[:int], 
 timestride:bat[:int], heartbeat:bat[:int], seen:bat[:timestamp], 
events:bat[:int])
 address BSKTtable
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -84,7 +84,7 @@ int pnettop = 0;
 int enabled[MAXPN];     /*array that contains the id's of all queries that are 
enable to fire*/
 
 static int pnstatus = PNINIT;
-static int cycleDelay = 500; /* be careful, it affects response/throughput 
timings */
+static int cycleDelay = 50; /* be careful, it affects response/throughput 
timings */
 
 str PNperiod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to