Changeset: e1a82a5485de for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e1a82a5485de
Added Files:
        clients/R/Tests/copy_into_fwf.R
        clients/R/Tests/copy_into_fwf.reqtests
        clients/R/Tests/copy_into_fwf.stable.err
        clients/R/Tests/copy_into_fwf.stable.out
Modified Files:
        clients/R/Tests/All
        clients/R/Tests/dbi.R
        clients/R/Tests/deps-test.R
        common/stream/stream.c
        common/stream/stream.h
        sql/backends/monet5/Makefile.ag
        sql/backends/monet5/sql.c
        sql/common/sql_types.c
        sql/server/rel_updates.c
        sql/server/sql_parser.y
        sql/server/sql_scan.c
Branch: default
Log Message:

COPY INTO from fixed width format files


diffs (truncated from 656 to 300 lines):

diff --git a/clients/R/Tests/All b/clients/R/Tests/All
--- a/clients/R/Tests/All
+++ b/clients/R/Tests/All
@@ -4,4 +4,5 @@ HAVE_LIBR?deps-test
 HAVE_LIBR?dbi
 HAVE_LIBR?dplyr
 HAVE_LIBR?dplyr-flights
+HAVE_LIBR?copy_into_fwf
 HAVE_LIBR&NOT_WIN32?dbapply
diff --git a/clients/R/Tests/copy_into_fwf.R b/clients/R/Tests/copy_into_fwf.R
new file mode 100644
--- /dev/null
+++ b/clients/R/Tests/copy_into_fwf.R
@@ -0,0 +1,35 @@
+if (Sys.getenv("TSTTRGDIR") != "") {
+       .libPaths(c(.libPaths(), paste0(Sys.getenv("TSTTRGDIR"),"/rlibdir")))
+}
+library(DBI, quietly = T)
+
+args <- commandArgs(trailingOnly = TRUE)
+dbport <- 50000
+dbname <- "mTests_clients_R"
+if (length(args) > 0) 
+       dbport <- args[[1]]
+if (length(args) > 1) 
+       dbname <- args[[2]]
+
+con <- dbConnect(MonetDBLite::MonetDB(), port = dbport, dbname = dbname, wait 
= T)
+stopifnot(dbIsValid(con))
+
+tf <- tempfile()
+
+gdata::write.fwf(mtcars, tf, colnames = FALSE)
+
+if (dbExistsTable(con, "mtcars")) dbRemoveTable(con, "mtcars")
+
+dbBegin(con)
+dbSendQuery(con, "CREATE TABLE mtcars (mpg DOUBLE PRECISION, cyl DOUBLE 
PRECISION, disp DOUBLE PRECISION, hp DOUBLE PRECISION, drat DOUBLE PRECISION, 
wt DOUBLE PRECISION, qsec DOUBLE PRECISION, vs DOUBLE PRECISION, am DOUBLE 
PRECISION, gear DOUBLE PRECISION, carb DOUBLE PRECISION)")
+
+# delimiters are ineffective for fwf import just set them to make sure they 
dont break stuff
+res <- dbSendQuery(con, paste0("COPY OFFSET 1 INTO mtcars FROM '", tf, "' 
USING DELIMITERS 'a','b','c' NULL AS '' FWF (4, 2, 6, 4, 5, 6, 6, 2, 2, 2, 2)"))
+
+print(dbReadTable(con, "mtcars"))
+
+stopifnot(nrow(dbReadTable(con, "mtcars")) > 1)
+
+dbRollback(con)
+
+print("SUCCESS")
diff --git a/clients/R/Tests/copy_into_fwf.reqtests 
b/clients/R/Tests/copy_into_fwf.reqtests
new file mode 100644
--- /dev/null
+++ b/clients/R/Tests/copy_into_fwf.reqtests
@@ -0,0 +1,1 @@
+dbi
diff --git a/clients/R/Tests/copy_into_fwf.stable.err 
b/clients/R/Tests/copy_into_fwf.stable.err
new file mode 100644
--- /dev/null
+++ b/clients/R/Tests/copy_into_fwf.stable.err
@@ -0,0 +1,35 @@
+stderr of test 'copy_into_fwf` in directory 'clients/R` itself:
+
+
+# 16:40:22 >  
+# 16:40:22 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=30658" "--set" 
"mapi_usock=/var/tmp/mtest-16034/.s.monetdb.30658" "--set" "monet_prompt=" 
"--forcemito" "--dbpath=/tmp/fuckit/var/MonetDB/mTests_clients_R" "--set" 
"embedded_r=yes"
+# 16:40:22 >  
+
+# builtin opt  gdk_dbpath = /tmp/fuckit/var/monetdb5/dbfarm/demo
+# builtin opt  gdk_debug = 0
+# builtin opt  gdk_vmtrim = no
+# builtin opt  monet_prompt = >
+# builtin opt  monet_daemon = no
+# builtin opt  mapi_port = 50000
+# builtin opt  mapi_open = false
+# builtin opt  mapi_autosense = false
+# builtin opt  sql_optimizer = default_pipe
+# builtin opt  sql_debug = 0
+# cmdline opt  gdk_nr_threads = 0
+# cmdline opt  mapi_open = true
+# cmdline opt  mapi_port = 30658
+# cmdline opt  mapi_usock = /var/tmp/mtest-16034/.s.monetdb.30658
+# cmdline opt  monet_prompt = 
+# cmdline opt  gdk_dbpath = /tmp/fuckit/var/MonetDB/mTests_clients_R
+# cmdline opt  embedded_r = yes
+# cmdline opt  gdk_debug = 536870922
+
+# 16:40:23 >  
+# 16:40:23 >  "R" "--vanilla" "--slave" "--args" "30658"
+# 16:40:23 >  
+
+
+# 16:40:23 >  
+# 16:40:23 >  "Done."
+# 16:40:23 >  
+
diff --git a/clients/R/Tests/copy_into_fwf.stable.out 
b/clients/R/Tests/copy_into_fwf.stable.out
new file mode 100644
--- /dev/null
+++ b/clients/R/Tests/copy_into_fwf.stable.out
@@ -0,0 +1,71 @@
+stdout of test 'copy_into_fwf` in directory 'clients/R` itself:
+
+
+# 16:40:22 >  
+# 16:40:22 >  "mserver5" "--debug=10" "--set" "gdk_nr_threads=0" "--set" 
"mapi_open=true" "--set" "mapi_port=30658" "--set" 
"mapi_usock=/var/tmp/mtest-16034/.s.monetdb.30658" "--set" "monet_prompt=" 
"--forcemito" "--dbpath=/tmp/fuckit/var/MonetDB/mTests_clients_R" "--set" 
"embedded_r=yes"
+# 16:40:22 >  
+
+# MonetDB 5 server v11.24.0
+# This is an unreleased version
+# Serving database 'mTests_clients_R', using 4 threads
+# Compiled for x86_64-apple-darwin15.5.0/64bit with 64bit OIDs and 128bit 
integers dynamically linked
+# Found 16.000 GiB available main-memory.
+# Copyright (c) 1993-July 2008 CWI.
+# Copyright (c) August 2008-2016 MonetDB B.V., all rights reserved
+# Visit http://www.monetdb.org/ for further information
+# Listening for connection requests on mapi:monetdb://dakar.da.cwi.nl:30658/
+# Listening for UNIX domain connection requests on 
mapi:monetdb:///var/tmp/mtest-16034/.s.monetdb.30658
+# MonetDB/SQL module loaded
+# MonetDB/R   module loaded
+
+Ready.
+
+# 16:40:23 >  
+# 16:40:23 >  "R" "--vanilla" "--slave" "--args" "30658"
+# 16:40:23 >  
+
+[1] TRUE
+<MonetDBResult>
+  SQL  CREATE TABLE mtcars (mpg DOUBLE PRECISION, cyl DOUBLE PRECISION, disp 
DOUBLE PRECISION, hp DOUBLE PRECISION, drat DOUBLE PRECISION, wt DOUBLE 
PRECISION, qsec DOUBLE PRECISION, vs DOUBLE PRECISION, am DOUBLE PRECISION, 
gear DOUBLE PRECISION, carb DOUBLE PRECISION)
+  ROWS Fetched: 0 [complete]
+       Changed: NA
+    mpg cyl  disp  hp drat    wt  qsec vs am gear carb
+1  21.0   6 160.0 110 3.90 2.620 16.46  0  1    4    4
+2  21.0   6 160.0 110 3.90 2.875 17.02  0  1    4    4
+3  22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1
+4  21.4   6 258.0 110 3.08 3.215 19.44  1  0    3    1
+5  18.7   8 360.0 175 3.15 3.440 17.02  0  0    3    2
+6  18.1   6 225.0 105 2.76 3.460 20.22  1  0    3    1
+7  14.3   8 360.0 245 3.21 3.570 15.84  0  0    3    4
+8  24.4   4 146.7  62 3.69 3.190 20.00  1  0    4    2
+9  22.8   4 140.8  95 3.92 3.150 22.90  1  0    4    2
+10 19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
+11 17.8   6 167.6 123 3.92 3.440 18.90  1  0    4    4
+12 16.4   8 275.8 180 3.07 4.070 17.40  0  0    3    3
+13 17.3   8 275.8 180 3.07 3.730 17.60  0  0    3    3
+14 15.2   8 275.8 180 3.07 3.780 18.00  0  0    3    3
+15 10.4   8 472.0 205 2.93 5.250 17.98  0  0    3    4
+16 10.4   8 460.0 215 3.00 5.424 17.82  0  0    3    4
+17 14.7   8 440.0 230 3.23 5.345 17.42  0  0    3    4
+18 32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
+19 30.4   4  75.7  52 4.93 1.615 18.52  1  1    4    2
+20 33.9   4  71.1  65 4.22 1.835 19.90  1  1    4    1
+21 21.5   4 120.1  97 3.70 2.465 20.01  1  0    3    1
+22 15.5   8 318.0 150 2.76 3.520 16.87  0  0    3    2
+23 15.2   8 304.0 150 3.15 3.435 17.30  0  0    3    2
+24 13.3   8 350.0 245 3.73 3.840 15.41  0  0    3    4
+25 19.2   8 400.0 175 3.08 3.845 17.05  0  0    3    2
+26 27.3   4  79.0  66 4.08 1.935 18.90  1  1    4    1
+27 26.0   4 120.3  91 4.43 2.140 16.70  0  1    5    2
+28 30.4   4  95.1 113 3.77 1.513 16.90  1  1    5    2
+29 15.8   8 351.0 264 4.22 3.170 14.50  0  1    5    4
+30 19.7   6 145.0 175 3.62 2.770 15.50  0  1    5    6
+31 15.0   8 301.0 335 3.54 3.570 14.60  0  1    5    8
+32 21.4   4 121.0 109 4.11 2.780 18.60  1  1    4    2
+[1] TRUE
+[1] "SUCCESS"
+
+# 16:40:23 >  
+# 16:40:23 >  "Done."
+# 16:40:23 >  
+
diff --git a/clients/R/Tests/dbi.R b/clients/R/Tests/dbi.R
--- a/clients/R/Tests/dbi.R
+++ b/clients/R/Tests/dbi.R
@@ -37,7 +37,7 @@ MonetDBLite::dbSendUpdate(con,"CREATE TA
 stopifnot(identical(dbExistsTable(con,tname),TRUE))
 MonetDBLite::dbSendUpdate(con,"INSERT INTO monetdbtest VALUES 
('one',1,'1111')")
 MonetDBLite::dbSendUpdate(con,"INSERT INTO monetdbtest VALUES 
('two',2,'22222222')")
-stopifnot(identical(dbGetQuery(con,"SELECT count(*) FROM 
monetdbtest")[[1]],2L))
+stopifnot(identical(dbGetQuery(con,"SELECT count(*) FROM monetdbtest")[[1]],2))
 
stopifnot(identical(dbReadTable(con,tname)[[3]],list(charToRaw("1111"),charToRaw("22222222"))))
 dbRemoveTable(con,tname)
 stopifnot(identical(dbExistsTable(con,tname),FALSE))
diff --git a/clients/R/Tests/deps-test.R b/clients/R/Tests/deps-test.R
--- a/clients/R/Tests/deps-test.R
+++ b/clients/R/Tests/deps-test.R
@@ -4,5 +4,5 @@ dd <- capture.output(suppressMessages(su
        repos <- 'http://cran.rstudio.com/'
        if(length(np)) install.packages(np, repos=repos, quiet=T)
        update.packages(repos=repos, ask=F, oldPkgs=lp, quiet=T)
-       })(c('Rcpp', 'dplyr', 'Lahman', 'nycflights13'))
+       })(c('Rcpp', 'dplyr', 'Lahman', 'nycflights13', 'gdata'))
 })))
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4671,3 +4671,143 @@ stream * stream_blackhole_create (void)
        s->access = ST_WRITE;
        return s;
 }
+
+
+/* fixed-width format streams */
+#define STREAM_FWF_NAME "fwf_ftw"
+
+typedef struct {
+       stream *s;
+       // config
+       size_t num_fields;
+       size_t *widths;
+       char filler;
+       // state
+       size_t line_len;
+       char* in_buf;
+       char* out_buf;
+       size_t out_buf_start;
+       size_t out_buf_remaining;
+} stream_fwf_data;
+
+
+static ssize_t
+stream_fwf_read(stream *s, void *buf, size_t elmsize, size_t cnt)
+{
+       stream_fwf_data *fsd;
+       size_t to_write = cnt;
+       size_t buf_written = 0;
+       if (strcmp(s->name, STREAM_FWF_NAME) != 0 || elmsize != 1) {
+               return -1;
+       }
+       fsd = (stream_fwf_data*) s->stream_data.p;
+
+       while (to_write > 0) {
+               // input conversion
+               if (fsd->out_buf_remaining == 0) { // need to convert next line
+                       size_t field_idx, in_buf_pos = 0, out_buf_pos = 0;
+                       ssize_t actually_read = fsd->s->read(fsd->s, 
fsd->in_buf, 1, fsd->line_len);
+                       if (actually_read < (ssize_t) fsd->line_len) { // 
incomplete last line
+                               if (actually_read < 0) {
+                                       return actually_read; // this is an 
error
+                               }
+                               return buf_written; // skip last line
+                       }
+                       for (field_idx = 0; field_idx < fsd->num_fields; 
field_idx++) {
+                               char *val_start, *val_end;
+                               val_start = fsd->in_buf + in_buf_pos;
+                               in_buf_pos += fsd->widths[field_idx];
+                               val_end = fsd->in_buf + in_buf_pos - 1;
+                               while (*val_start == fsd->filler) val_start++;
+                               while (*val_end == fsd->filler) val_end--;
+                               while (val_start <= val_end) {
+                                       if (*val_start == STREAM_FWF_FIELD_SEP) 
{
+                                               fsd->out_buf[out_buf_pos++] = 
STREAM_FWF_ESCAPE;
+                                       }
+                                       fsd->out_buf[out_buf_pos++] = 
*val_start++;
+                               }
+                               fsd->out_buf[out_buf_pos++] = 
STREAM_FWF_FIELD_SEP;
+                       }
+                       fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP;
+                       fsd->out_buf_remaining = out_buf_pos;
+                       fsd->out_buf_start = 0;
+               }
+
+               // now we know something is in output_buf so deliver it
+               if (fsd->out_buf_remaining <= to_write) {
+                       memcpy((char*)buf + buf_written, fsd->out_buf + 
fsd->out_buf_start, fsd->out_buf_remaining);
+                       to_write -= fsd->out_buf_remaining;
+                       buf_written += fsd->out_buf_remaining;
+                       fsd->out_buf_remaining = 0;
+               } else {
+                       memcpy((char*) buf + buf_written, fsd->out_buf + 
fsd->out_buf_start, to_write);
+                       fsd->out_buf_start += to_write;
+                       fsd->out_buf_remaining -= to_write;
+                       buf_written += to_write;
+                       to_write = 0;
+               }
+       }
+       return buf_written;
+}
+
+
+static void
+stream_fwf_close(stream *s)
+{
+       if (strcmp(s->name, STREAM_FWF_NAME) == 0) {
+               stream_fwf_data *fsd = (stream_fwf_data*) s->stream_data.p;
+               mnstr_close(fsd->s);
+               free(fsd->widths);
+               free(fsd->in_buf);
+               free(fsd->out_buf);
+               free(fsd);
+       }
+       // FIXME destroy(s);
+}
+
+stream*
+stream_fwf_create (stream *s, size_t num_fields, size_t *widths, char filler)
+{
+       stream *ns;
+       stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data));
+       size_t i, out_buf_len;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to