Changeset: 7a7bd9024d74 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7a7bd9024d74
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        clients/mapilib/mapi.h
        common/stream/stream.c
        common/stream/stream.h
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_client.h
        monetdb5/mal/mal_function.c
        monetdb5/mal/mal_session.c
        monetdb5/mal/mal_session.h
        monetdb5/modules/mal/mal_mapi.c
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_result.c
        sql/backends/monet5/sql_result.h
Branch: protocol
Log Message:

compressed streams actually working for large results


diffs (truncated from 479 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -3300,17 +3300,8 @@ main(int argc, char **argv)
        }
 
        if (protocol) {
-               if (strcasecmp(protocol, "prot9") == 0) {
-                       mapi_set_protocol(mid, prot9);
-               }
-               else if (strcasecmp(protocol, "prot10") == 0) {
-                       mapi_set_protocol(mid, prot10);
-               }
-               else if (strcasecmp(protocol, "prot10compressed") == 0) {
-                       mapi_set_protocol(mid, prot10compressed);
-               }
-               else {
-                       fprintf(stderr, "invalid protocol name '%s'\n", 
protocol);
+               if (mapi_set_protocol(mid, protocol) != 0) {
+                       fprintf(stderr, "%s\n", mapi_error_str(mid));
                }
        }
 
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -5540,8 +5540,22 @@ mapi_get_active(Mapi mid)
        return mid->active;
 }
 
-void mapi_set_protocol(Mapi mid, protocol_version prot) {
-       mid->protocol = prot;
+MapiMsg mapi_set_protocol(Mapi mid, const char* protocol) {
+       if (strcasecmp(protocol, "prot9") == 0) {
+               mid->protocol = prot9;
+       }
+       else if (strcasecmp(protocol, "prot10") == 0) {
+               mid->protocol = prot10;
+       }
+       else if (strcasecmp(protocol, "prot10compressed") == 0) {
+               mid->protocol = prot10compressed;
+       }
+       else {
+               mapi_setError(mid, "invalid protocol name", 
"mapi_set_protocol", MERROR);
+               return -1;
+       }
+
+       return 0;
 }
 
 void mapi_set_blocksize(Mapi mid, size_t blocksize) {
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -129,13 +129,6 @@ typedef struct {           /* used by MAPI_DATETI
        unsigned int fraction;  /* in 1000 millionths of a second (10e-9) */
 } MapiDateTime;
 
-typedef enum {
-       protauto = 0,
-       prot9 = 1,
-       prot10 = 2,
-       prot10compressed = 3,
-} protocol_version;
-
 /* connection-oriented functions */
 mapi_export Mapi mapi_mapi(const char *host, int port, const char *username, 
const char *password, const char *lang, const char *dbname);
 mapi_export Mapi mapi_mapiuri(const char *url, const char *user, const char 
*pass, const char *lang);
@@ -240,7 +233,7 @@ mapi_export char *mapi_quote(const char 
 mapi_export char *mapi_unquote(char *msg);
 mapi_export MapiHdl mapi_get_active(Mapi mid);
 
-mapi_export void mapi_set_protocol(Mapi mid, protocol_version prot);
+mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot);
 mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize);
 
 #ifdef _MSC_VER
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -3977,6 +3977,7 @@ typedef struct bs2 {
        size_t nr;              /* how far we got in buf */
        size_t itotal;  /* amount available in current read block */
        size_t bufsiz;
+       size_t readpos;
        compression_method comp;
        char *compbuf;
        size_t compbufsiz;
@@ -4047,6 +4048,7 @@ bs2_write(stream *ss, const void *buf, s
                s->nr += n;
                todo -= n;
                buf = ((const char *) buf + n);
+               /* block is full, write it to the stream */
                if (s->nr == s->bufsiz) {
 
 #ifdef BSTREAM_DEBUG
@@ -4063,17 +4065,15 @@ bs2_write(stream *ss, const void *buf, s
                        }
 #endif
 
-
-                       /* block is full, write it to the stream */
-
                        writelen = s->nr;
                        blksize = s->nr;
                        writebuf = s->buf;
 
                        if (s->comp == COMPRESSION_SNAPPY) {
+                               snappy_status ret;
                                size_t compressed_length = s->compbufsiz;
-                               if (snappy_compress(s->buf, s->nr, s->compbuf, 
&compressed_length) != SNAPPY_OK) {
-                                       ss->errnr = -42;
+                               if ((ret = snappy_compress(s->buf, s->nr, 
s->compbuf, &compressed_length)) != SNAPPY_OK) {
+                                       ss->errnr = (int) ret;
                                        return -1;
                                }
                                writebuf = s->compbuf;
@@ -4137,8 +4137,9 @@ bs2_flush(stream *ss)
 
                if (s->nr > 0 && s->comp == COMPRESSION_SNAPPY) {
                        size_t compressed_length = s->compbufsiz;
-                       if (snappy_compress(s->buf, s->nr, s->compbuf, 
&compressed_length) != SNAPPY_OK) {
-                               ss->errnr = -42;
+                       snappy_status ret;
+                       if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, 
&compressed_length)) != SNAPPY_OK) {
+                               ss->errnr = (int) ret;
                                return -1;
                        }
                        writebuf = s->compbuf;
@@ -4221,22 +4222,30 @@ bs2_read(stream *ss, void *buf, size_t e
                s->itotal = (size_t) (blksize >> 1);    /* amount readable */
                /* store whether this was the last block or not */
                s->nr = blksize & 1;
-       }
-
-       if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
-               // read everything into the comp buf
-               size_t uncompressed_length = s->bufsiz;
-               ssize_t m = s->s->read(s->s, s->compbuf, 1, s->itotal);
-               if (m <= 0) {
-                       ss->errnr = s->s->errnr;
-                       return -1;
+
+
+               if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
+                       // read everything into the comp buf
+                       size_t uncompressed_length = s->bufsiz;
+                       size_t m = 0;
+                       snappy_status ret;
+
+                       while (m < s->itotal) {
+                               ssize_t bytes_read = 0;
+                               bytes_read = s->s->read(s->s, s->compbuf + m, 
1, s->itotal - m);
+                               if (bytes_read <= 0) {
+                                       ss->errnr = s->s->errnr;
+                                       return -1;
+                               }
+                               m += bytes_read;
+                       }
+                       if ((ret = snappy_uncompress(s->compbuf, s->itotal, 
s->buf, &uncompressed_length)) != SNAPPY_OK) {
+                               ss->errnr = (int) ret;
+                               return -1;
+                       }
+                       s->itotal = uncompressed_length;
+                       s->readpos = 0;
                }
-               // FIXME: err, we could have more bytes in the buffer than 
bufsiz and the uncompressed length could be even bigger. need another buffer.
-               if (snappy_uncompress(s->compbuf, s->itotal, s->buf, 
&uncompressed_length) != SNAPPY_OK) {
-                       ss->errnr = -42;
-                       return -1;
-               }
-               s->itotal = uncompressed_length;
        }
 
        /* Fill the caller's buffer. */
@@ -4246,11 +4255,13 @@ bs2_read(stream *ss, void *buf, size_t e
                 * read it */
                n = todo < s->itotal ? todo : s->itotal;
                if (s->comp == COMPRESSION_SNAPPY) {
-                       memcpy(buf, s->buf, n);
+                       memcpy(buf, s->buf + s->readpos, n);
                        buf = (void *) ((char *) buf + n);
                        cnt += n;
                        todo -= n;
+                       s->readpos += n;
                        s->itotal -= n;
+
                } else {
                        while (n > 0) {
                                ssize_t m = s->s->read(s->s, buf, 1, n);
@@ -4315,19 +4326,26 @@ bs2_read(stream *ss, void *buf, size_t e
 
                        if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) {
                                // read everything into the comp buf
-                               size_t uncompressed_length = 0;
-                               ssize_t m = s->s->read(s->s, s->compbuf, 1, 
s->itotal);
-                               if (m <= 0) {
-                                       ss->errnr = s->s->errnr;
-                                       return -1;
+                               size_t uncompressed_length = s->bufsiz;
+                               size_t m = 0;
+                               snappy_status ret;
+
+                               while (m < s->itotal) {
+                                       ssize_t bytes_read = 0;
+                                       bytes_read = s->s->read(s->s, 
s->compbuf + m, 1, s->itotal - m);
+                                       if (bytes_read <= 0) {
+                                               ss->errnr = s->s->errnr;
+                                               return -1;
+                                       }
+                                       m += bytes_read;
                                }
-                               if (snappy_uncompress(s->compbuf, s->itotal, 
s->buf, &uncompressed_length) != SNAPPY_OK) {
-                                       ss->errnr = -42;
+                               if ((ret = snappy_uncompress(s->compbuf, 
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
+                                       ss->errnr = (int) ret;
                                        return -1;
                                }
                                s->itotal = uncompressed_length;
+                               s->readpos = 0;
                        }
-
                }
        }
        /* if we got an empty block with the end-of-sequence marker
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -225,6 +225,13 @@ stream_export int isa_block_stream(strea
 stream_export stream* bs_stream(stream *s);
 
 typedef enum {
+       protauto = 0,
+       prot9 = 1,
+       prot10 = 2,
+       prot10compressed = 3,
+} protocol_version;
+
+typedef enum {
        COMPRESSION_NONE = 0,
        COMPRESSION_SNAPPY = 1
 } compression_method;
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -259,6 +259,8 @@ MCinitClientRecord(Client c, oid user, b
                        GDKfree(msg);
        }
 #endif
+       c->blocksize = BLOCK;
+       c->protocol = prot9;
        MT_sema_init(&c->s, 0, "Client->s");
        return c;
 }
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -179,6 +179,9 @@ typedef struct CLIENT {
        BAT *error_fld;
        BAT *error_msg;
        BAT *error_input;
+
+       size_t blocksize;
+       protocol_version protocol;
 } *Client, ClientRec;
 
 mal_export void    MCinit(void);
diff --git a/monetdb5/mal/mal_function.c b/monetdb5/mal/mal_function.c
--- a/monetdb5/mal/mal_function.c
+++ b/monetdb5/mal/mal_function.c
@@ -488,7 +488,7 @@ debugFunction(stream *fd, MalBlkPtr mb, 
                        if (p->token == REMsymbol)
                                mnstr_printf(fd,"%-40s\n",ps);
                        else {
-                               mnstr_printf(fd,"%-40s\t#[%d] ("BUNFMT") %s 
",ps, i, getRowCnt(mb,getArg(p,0)), (p->blk && p->blk->binding? 
p->blk->binding:""));
+                               mnstr_printf(fd,"%-40s\t#[%d] ("BUNFMT") %s 
",ps, i, getRowCnt(mb,getArg(p,0)), (p->blk? p->blk->binding:""));
                                for(j =0; j < p->retc; j++)
                                        mnstr_printf(fd,"%d ",getArg(p,j));
                                if( p->argc - p->retc > 0)
diff --git a/monetdb5/mal/mal_session.c b/monetdb5/mal/mal_session.c
--- a/monetdb5/mal/mal_session.c
+++ b/monetdb5/mal/mal_session.c
@@ -146,7 +146,7 @@ exit_streams( bstream *fin, stream *fout
 const char* mal_enableflag = "mal_for_all";
 
 void
-MSscheduleClient(str command, str challenge, bstream *fin, stream *fout)
+MSscheduleClient(str command, str challenge, bstream *fin, stream *fout, 
protocol_version protocol, size_t blocksize)
 {
        char *user = command, *algo = NULL, *passwd = NULL, *lang = NULL;
        char *database = NULL, *s, *dbname;
@@ -326,6 +326,10 @@ MSscheduleClient(str command, str challe
         * demand. */
 
        /* fork a new thread to handle this client */
+
+       c->protocol = protocol;
+       c->blocksize = blocksize;
+
        mnstr_settimeout(c->fdin->s, 50, GDKexiting);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to