Changeset: 7a7bd9024d74 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7a7bd9024d74 Modified Files: clients/mapiclient/mclient.c clients/mapilib/mapi.c clients/mapilib/mapi.h common/stream/stream.c common/stream/stream.h monetdb5/mal/mal_client.c monetdb5/mal/mal_client.h monetdb5/mal/mal_function.c monetdb5/mal/mal_session.c monetdb5/mal/mal_session.h monetdb5/modules/mal/mal_mapi.c sql/backends/monet5/sql.c sql/backends/monet5/sql_result.c sql/backends/monet5/sql_result.h Branch: protocol Log Message:
compressed streams actually working for large results diffs (truncated from 479 to 300 lines): diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c --- a/clients/mapiclient/mclient.c +++ b/clients/mapiclient/mclient.c @@ -3300,17 +3300,8 @@ main(int argc, char **argv) } if (protocol) { - if (strcasecmp(protocol, "prot9") == 0) { - mapi_set_protocol(mid, prot9); - } - else if (strcasecmp(protocol, "prot10") == 0) { - mapi_set_protocol(mid, prot10); - } - else if (strcasecmp(protocol, "prot10compressed") == 0) { - mapi_set_protocol(mid, prot10compressed); - } - else { - fprintf(stderr, "invalid protocol name '%s'\n", protocol); + if (mapi_set_protocol(mid, protocol) != 0) { + fprintf(stderr, "%s\n", mapi_error_str(mid)); } } diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -5540,8 +5540,22 @@ mapi_get_active(Mapi mid) return mid->active; } -void mapi_set_protocol(Mapi mid, protocol_version prot) { - mid->protocol = prot; +MapiMsg mapi_set_protocol(Mapi mid, const char* protocol) { + if (strcasecmp(protocol, "prot9") == 0) { + mid->protocol = prot9; + } + else if (strcasecmp(protocol, "prot10") == 0) { + mid->protocol = prot10; + } + else if (strcasecmp(protocol, "prot10compressed") == 0) { + mid->protocol = prot10compressed; + } + else { + mapi_setError(mid, "invalid protocol name", "mapi_set_protocol", MERROR); + return -1; + } + + return 0; } void mapi_set_blocksize(Mapi mid, size_t blocksize) { diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h --- a/clients/mapilib/mapi.h +++ b/clients/mapilib/mapi.h @@ -129,13 +129,6 @@ typedef struct { /* used by MAPI_DATETI unsigned int fraction; /* in 1000 millionths of a second (10e-9) */ } MapiDateTime; -typedef enum { - protauto = 0, - prot9 = 1, - prot10 = 2, - prot10compressed = 3, -} protocol_version; - /* connection-oriented functions */ mapi_export Mapi mapi_mapi(const char *host, int port, const char *username, const char *password, const char *lang, const char *dbname); mapi_export Mapi mapi_mapiuri(const char *url, const char *user, const char *pass, const char *lang); @@ -240,7 +233,7 @@ mapi_export char *mapi_quote(const char mapi_export char *mapi_unquote(char *msg); mapi_export MapiHdl mapi_get_active(Mapi mid); -mapi_export void mapi_set_protocol(Mapi mid, protocol_version prot); +mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot); mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize); #ifdef _MSC_VER diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -3977,6 +3977,7 @@ typedef struct bs2 { size_t nr; /* how far we got in buf */ size_t itotal; /* amount available in current read block */ size_t bufsiz; + size_t readpos; compression_method comp; char *compbuf; size_t compbufsiz; @@ -4047,6 +4048,7 @@ bs2_write(stream *ss, const void *buf, s s->nr += n; todo -= n; buf = ((const char *) buf + n); + /* block is full, write it to the stream */ if (s->nr == s->bufsiz) { #ifdef BSTREAM_DEBUG @@ -4063,17 +4065,15 @@ bs2_write(stream *ss, const void *buf, s } #endif - - /* block is full, write it to the stream */ - writelen = s->nr; blksize = s->nr; writebuf = s->buf; if (s->comp == COMPRESSION_SNAPPY) { + snappy_status ret; size_t compressed_length = s->compbufsiz; - if (snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length) != SNAPPY_OK) { - ss->errnr = -42; + if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length)) != SNAPPY_OK) { + ss->errnr = (int) ret; return -1; } writebuf = s->compbuf; @@ -4137,8 +4137,9 @@ bs2_flush(stream *ss) if (s->nr > 0 && s->comp == COMPRESSION_SNAPPY) { size_t compressed_length = s->compbufsiz; - if (snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length) != SNAPPY_OK) { - ss->errnr = -42; + snappy_status ret; + if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length)) != SNAPPY_OK) { + ss->errnr = (int) ret; return -1; } writebuf = s->compbuf; @@ -4221,22 +4222,30 @@ bs2_read(stream *ss, void *buf, size_t e s->itotal = (size_t) (blksize >> 1); /* amount readable */ /* store whether this was the last block or not */ s->nr = blksize & 1; - } - - if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) { - // read everything into the comp buf - size_t uncompressed_length = s->bufsiz; - ssize_t m = s->s->read(s->s, s->compbuf, 1, s->itotal); - if (m <= 0) { - ss->errnr = s->s->errnr; - return -1; + + + if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) { + // read everything into the comp buf + size_t uncompressed_length = s->bufsiz; + size_t m = 0; + snappy_status ret; + + while (m < s->itotal) { + ssize_t bytes_read = 0; + bytes_read = s->s->read(s->s, s->compbuf + m, 1, s->itotal - m); + if (bytes_read <= 0) { + ss->errnr = s->s->errnr; + return -1; + } + m += bytes_read; + } + if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { + ss->errnr = (int) ret; + return -1; + } + s->itotal = uncompressed_length; + s->readpos = 0; } - // FIXME: err, we could have more bytes in the buffer than bufsiz and the uncompressed length could be even bigger. need another buffer. - if (snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length) != SNAPPY_OK) { - ss->errnr = -42; - return -1; - } - s->itotal = uncompressed_length; } /* Fill the caller's buffer. */ @@ -4246,11 +4255,13 @@ bs2_read(stream *ss, void *buf, size_t e * read it */ n = todo < s->itotal ? todo : s->itotal; if (s->comp == COMPRESSION_SNAPPY) { - memcpy(buf, s->buf, n); + memcpy(buf, s->buf + s->readpos, n); buf = (void *) ((char *) buf + n); cnt += n; todo -= n; + s->readpos += n; s->itotal -= n; + } else { while (n > 0) { ssize_t m = s->s->read(s->s, buf, 1, n); @@ -4315,19 +4326,26 @@ bs2_read(stream *ss, void *buf, size_t e if (s->itotal > 0 && s->comp == COMPRESSION_SNAPPY) { // read everything into the comp buf - size_t uncompressed_length = 0; - ssize_t m = s->s->read(s->s, s->compbuf, 1, s->itotal); - if (m <= 0) { - ss->errnr = s->s->errnr; - return -1; + size_t uncompressed_length = s->bufsiz; + size_t m = 0; + snappy_status ret; + + while (m < s->itotal) { + ssize_t bytes_read = 0; + bytes_read = s->s->read(s->s, s->compbuf + m, 1, s->itotal - m); + if (bytes_read <= 0) { + ss->errnr = s->s->errnr; + return -1; + } + m += bytes_read; } - if (snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length) != SNAPPY_OK) { - ss->errnr = -42; + if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { + ss->errnr = (int) ret; return -1; } s->itotal = uncompressed_length; + s->readpos = 0; } - } } /* if we got an empty block with the end-of-sequence marker diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -225,6 +225,13 @@ stream_export int isa_block_stream(strea stream_export stream* bs_stream(stream *s); typedef enum { + protauto = 0, + prot9 = 1, + prot10 = 2, + prot10compressed = 3, +} protocol_version; + +typedef enum { COMPRESSION_NONE = 0, COMPRESSION_SNAPPY = 1 } compression_method; diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -259,6 +259,8 @@ MCinitClientRecord(Client c, oid user, b GDKfree(msg); } #endif + c->blocksize = BLOCK; + c->protocol = prot9; MT_sema_init(&c->s, 0, "Client->s"); return c; } diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h --- a/monetdb5/mal/mal_client.h +++ b/monetdb5/mal/mal_client.h @@ -179,6 +179,9 @@ typedef struct CLIENT { BAT *error_fld; BAT *error_msg; BAT *error_input; + + size_t blocksize; + protocol_version protocol; } *Client, ClientRec; mal_export void MCinit(void); diff --git a/monetdb5/mal/mal_function.c b/monetdb5/mal/mal_function.c --- a/monetdb5/mal/mal_function.c +++ b/monetdb5/mal/mal_function.c @@ -488,7 +488,7 @@ debugFunction(stream *fd, MalBlkPtr mb, if (p->token == REMsymbol) mnstr_printf(fd,"%-40s\n",ps); else { - mnstr_printf(fd,"%-40s\t#[%d] ("BUNFMT") %s ",ps, i, getRowCnt(mb,getArg(p,0)), (p->blk && p->blk->binding? p->blk->binding:"")); + mnstr_printf(fd,"%-40s\t#[%d] ("BUNFMT") %s ",ps, i, getRowCnt(mb,getArg(p,0)), (p->blk? p->blk->binding:"")); for(j =0; j < p->retc; j++) mnstr_printf(fd,"%d ",getArg(p,j)); if( p->argc - p->retc > 0) diff --git a/monetdb5/mal/mal_session.c b/monetdb5/mal/mal_session.c --- a/monetdb5/mal/mal_session.c +++ b/monetdb5/mal/mal_session.c @@ -146,7 +146,7 @@ exit_streams( bstream *fin, stream *fout const char* mal_enableflag = "mal_for_all"; void -MSscheduleClient(str command, str challenge, bstream *fin, stream *fout) +MSscheduleClient(str command, str challenge, bstream *fin, stream *fout, protocol_version protocol, size_t blocksize) { char *user = command, *algo = NULL, *passwd = NULL, *lang = NULL; char *database = NULL, *s, *dbname; @@ -326,6 +326,10 @@ MSscheduleClient(str command, str challe * demand. */ /* fork a new thread to handle this client */ + + c->protocol = protocol; + c->blocksize = blocksize; + mnstr_settimeout(c->fdin->s, 50, GDKexiting); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list