Changeset: a2e6bf7213e6 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a2e6bf7213e6 Modified Files: clients/mapilib/mapi.c common/stream/Makefile.ag common/stream/stream.c common/stream/stream.h configure.ag monetdb5/modules/mal/mal_mapi.c Branch: protocol Log Message:
mapi version 10, bigger and snappier blocks diffs (truncated from 894 to 300 lines): diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c --- a/clients/mapilib/mapi.c +++ b/clients/mapilib/mapi.c @@ -2214,6 +2214,8 @@ mapi_reconnect(Mapi mid) char *protover; char *rest; protocol_version prot_version = prot9; + // FIXME: make this configurable + size_t block_size = 1024000; if (mid->connected) close_connection(mid); @@ -2732,7 +2734,7 @@ mapi_reconnect(Mapi mid) if (prot_version == prot10 || prot_version == prot10compressed) { // if we are using protocol 10, we have to send either PROT10/PROT10COMPRESSED to the server // so the server knows which protocol to use - retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:%s:\n", + retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:%s:%zu:\n", #ifdef WORDS_BIGENDIAN "BIG", #else @@ -2740,7 +2742,8 @@ mapi_reconnect(Mapi mid) #endif mid->username, hash, mid->language, mid->database == NULL ? "" : mid->database, - prot_version == prot10 ? "PROT10" : "PROT10COMPRESSED"); + prot_version == prot10 ? "PROT10" : "PROT10COMPRESSED", + block_size); } else { retval = snprintf(buf, BLOCK, "%s:%s:%s:%s:%s:\n", #ifdef WORDS_BIGENDIAN @@ -2781,15 +2784,13 @@ mapi_reconnect(Mapi mid) check_stream(mid, mid->to, "Could not send initial byte sequence", "mapi_reconnect", mid->error); if (prot_version == prot10 || prot_version == prot10compressed) { - bstream *bs_to = (bstream*) mid->to; - bstream *bs_from = (bstream*) mid->from; printf("Using protocol version %s.\n", prot_version == prot10 ? "PROT10" : "PROT10COMPRESSED"); assert(isa_block_stream(mid->to)); assert(isa_block_stream(mid->from)); if (prot_version == prot10compressed) { -#ifdef HAVE_LIBSNAPPY +#ifdef HAVE_LIBSNAPPY2 mid->to = compressed_stream(bs_to->s, COMPRESSION_SNAPPY); mid->from = compressed_stream(bs_from->s, COMPRESSION_SNAPPY); #else @@ -2797,14 +2798,15 @@ mapi_reconnect(Mapi mid) #endif } else { // FIXME: figure out proper stream sizes - mid->to = byte_stream(bs_to->s, 1024000); - mid->from = byte_stream(bs_from->s, 1024000); - + mid->to = block_stream2(bs_stream(mid->to), block_size); + mid->from = block_stream2(bs_stream(mid->from), block_size); } - bs_to->s = NULL; - bs_from->s = NULL; - close_stream((stream*) bs_to); - close_stream((stream*) bs_from); + + // FIXME: this leaks +// bs_to->s = NULL; +// bs_from->s = NULL; +// close_stream((stream*) bs_to); +// close_stream((stream*) bs_from); } diff --git a/common/stream/Makefile.ag b/common/stream/Makefile.ag --- a/common/stream/Makefile.ag +++ b/common/stream/Makefile.ag @@ -10,6 +10,7 @@ MTSAFE INCLUDES = $(zlib_CFLAGS) \ $(BZ_CFLAGS) \ + $(snappy_CFLAGS) \ $(liblzma_CFLAGS) \ $(openssl_CFLAGS) \ $(curl_CFLAGS) @@ -20,6 +21,7 @@ lib_stream = { LIBS = $(SOCKET_LIBS) \ $(zlib_LIBS) \ $(BZ_LIBS) \ + $(snappy_LIBS) \ $(liblzma_LIBS) \ $(openssl_LIBS) \ $(curl_LIBS) \ diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -3567,6 +3567,8 @@ buffer_wastream(buffer *b, const char *n return s; } + + /* ------------------------------------------------------------------ */ /* A buffered stream consists of a sequence of blocks. Each block @@ -3885,6 +3887,20 @@ bs_isalive(stream *ss) } static void +bs_close(stream *ss) +{ + bs *s; + + s = (bs *) ss->stream_data.p; + assert(s); + if (s == NULL) + return; + assert(s->s); + if (s->s) + s->s->close(s->s); +} + +static void bs_destroy(stream *ss) { bs *s; @@ -3900,24 +3916,6 @@ bs_destroy(stream *ss) destroy(ss); } - - -static void -bs_close(stream *ss) -{ - bs *s; - - s = (bs *) ss->stream_data.p; - assert(s); - if (s == NULL) - return; - assert(s->s); - if (s->s) - s->s->close(s->s); - bs_destroy(ss); -} - - static void bs_clrerr(stream *s) { @@ -3925,6 +3923,10 @@ bs_clrerr(stream *s) mnstr_clearerr(((bs *) s->stream_data.p)->s); } +stream* bs_stream(stream *s) { + assert(isa_block_stream(s)); + return ((bs*)s->stream_data.p)->s; +} // FIXME: patch bs_read/bs_write etc // 10 MB max buffer size or so @@ -3966,208 +3968,308 @@ block_stream(stream *s) return ns; } + +typedef struct bs2 { + stream *s; /* underlying stream */ + size_t nr; /* how far we got in buf */ + size_t itotal; /* amount available in current read block */ + size_t bufsiz; + char buf[0]; /* the buffered data (minus the size of + * size-short */ + +} bs2; + +static bs2 * +bs2_create(stream *s, size_t bufsiz) +{ + /* should be a binary stream */ + bs2 *ns; + + if ((ns = malloc(sizeof(*ns) + bufsiz)) == NULL) + return NULL; + ns->s = s; + ns->nr = 0; + ns->itotal = 0; + ns->bufsiz = bufsiz; + return ns; +} + +/* Collect data until the internal buffer is filled, then write the + * filled buffer to the underlying stream. + * Struct field usage: + * s - the underlying stream; + * buf - the buffer in which data is collected; + * nr - how much of buf is already filled (if nr == sizeof(buf) the + * data is written to the underlying stream, so upon entry nr < + * sizeof(buf)); + * itotal - unused. + */ +static ssize_t +bs2_write(stream *ss, const void *buf, size_t elmsize, size_t cnt) +{ + bs2 *s; + size_t todo = cnt * elmsize; + lng blksize; + + s = (bs2 *) ss->stream_data.p; + if (s == NULL) + return -1; + assert(ss->access == ST_WRITE); + assert(s->nr < s->bufsiz); + while (todo > 0) { + size_t n = s->bufsiz - s->nr; + + if (todo < n) + n = todo; + memcpy(s->buf + s->nr, buf, n); + s->nr += n; + todo -= n; + buf = ((const char *) buf + n); + if (s->nr == s->bufsiz) { + +#ifdef BSTREAM_DEBUG + { + size_t i; + + fprintf(stderr, "W %s %lu \"", ss->name, s->nr); + for (i = 0; i < s->nr; i++) + if (' ' <= s->buf[i] && s->buf[i] < 127) + putc(s->buf[i], stderr); + else + fprintf(stderr, "\\%03o", s->buf[i]); + fprintf(stderr, "\"\n"); + } +#endif + + + /* block is full, write it to the stream */ + + /* since the block is at max BLOCK (8K) - 2 size we can + * store it in a two byte integer */ + blksize = s->nr; + /* the last bit tells whether a flush is in there, it's not + * at this moment, so shift it to the left */ + blksize <<= 1; + + if (!mnstr_writeLng(s->s, blksize) || s->s->write(s->s, s->buf, 1, s->nr) != (ssize_t) s->nr) { + ss->errnr = MNSTR_WRITE_ERROR; + return -1; + } + s->nr = 0; + } + } + return (ssize_t) cnt; +} + +/* If the internal buffer is partially filled, write it to the + * underlying stream. Then in any case write an empty buffer to the + * underlying stream to indicate to the receiver that the data was + * flushed. + */ +static int +bs2_flush(stream *ss) +{ + lng blksize; + bs2 *s; + + s = (bs2 *) ss->stream_data.p; + if (s == NULL) + return -1; + assert(ss->access == ST_WRITE); + assert(s->nr < s->bufsiz); + if (ss->access == ST_WRITE) { + /* flush the rest of buffer (if s->nr > 0), then set the + * last bit to 1 to to indicate user-instigated flush */ +#ifdef BSTREAM_DEBUG + if (s->nr > 0) { + size_t i; + + fprintf(stderr, "W %s %lu \"", ss->name, s->nr); + for (i = 0; i < s->nr; i++) + if (' ' <= s->buf[i] && s->buf[i] < 127) + putc(s->buf[i], stderr); + else + fprintf(stderr, "\\%03o", s->buf[i]); + fprintf(stderr, "\"\n"); + fprintf(stderr, "W %s 0\n", ss->name); + } +#endif + blksize = ((lng) s->nr) << 1; + /* indicate that this is the last buffer of a block by + * setting the low-order bit */ + blksize |= 1; + /* always flush (even empty blocks) needed for the protocol) */ + + if ((!mnstr_writeLng(s->s, blksize) || + (s->nr > 0 && + s->s->write(s->s, s->buf, 1, s->nr) != (ssize_t) s->nr))) { + ss->errnr = MNSTR_WRITE_ERROR; + return -1; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list