Changeset: a2e6bf7213e6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a2e6bf7213e6
Modified Files:
        clients/mapilib/mapi.c
        common/stream/Makefile.ag
        common/stream/stream.c
        common/stream/stream.h
        configure.ag
        monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:

mapi version 10, bigger and snappier blocks


diffs (truncated from 894 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2214,6 +2214,8 @@ mapi_reconnect(Mapi mid)
        char *protover;
        char *rest;
        protocol_version prot_version = prot9;
+       // FIXME: make this configurable
+       size_t block_size = 1024000;
 
        if (mid->connected)
                close_connection(mid);
@@ -2732,7 +2734,7 @@ mapi_reconnect(Mapi mid)
                        if (prot_version == prot10 || prot_version == 
prot10compressed) {
                                // if we are using protocol 10, we have to send 
either PROT10/PROT10COMPRESSED to the server
                                // so the server knows which protocol to use
-                               retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:%s:\n",
+                               retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:%s:%zu:\n",
        #ifdef WORDS_BIGENDIAN
                                     "BIG",
        #else
@@ -2740,7 +2742,8 @@ mapi_reconnect(Mapi mid)
        #endif
                                     mid->username, hash, mid->language,
                                     mid->database == NULL ? "" : mid->database,
-                                    prot_version == prot10 ? "PROT10" : 
"PROT10COMPRESSED");
+                                    prot_version == prot10 ? "PROT10" : 
"PROT10COMPRESSED",
+                                   block_size);
                        } else {
                                retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:\n",
        #ifdef WORDS_BIGENDIAN
@@ -2781,15 +2784,13 @@ mapi_reconnect(Mapi mid)
        check_stream(mid, mid->to, "Could not send initial byte sequence", 
"mapi_reconnect", mid->error);
 
        if (prot_version == prot10 || prot_version == prot10compressed) {
-               bstream *bs_to = (bstream*) mid->to;
-               bstream *bs_from = (bstream*) mid->from;
 
                printf("Using protocol version %s.\n", prot_version == prot10  
? "PROT10" : "PROT10COMPRESSED");
                assert(isa_block_stream(mid->to));
                assert(isa_block_stream(mid->from));
 
                if (prot_version == prot10compressed) {
-#ifdef HAVE_LIBSNAPPY
+#ifdef HAVE_LIBSNAPPY2
                        mid->to = compressed_stream(bs_to->s, 
COMPRESSION_SNAPPY);
                        mid->from = compressed_stream(bs_from->s, 
COMPRESSION_SNAPPY);
 #else
@@ -2797,14 +2798,15 @@ mapi_reconnect(Mapi mid)
 #endif
                } else {
                        // FIXME: figure out proper stream sizes
-                       mid->to = byte_stream(bs_to->s, 1024000);
-                       mid->from = byte_stream(bs_from->s, 1024000);
-
+                       mid->to = block_stream2(bs_stream(mid->to), block_size);
+                       mid->from = block_stream2(bs_stream(mid->from), 
block_size);
                }
-               bs_to->s = NULL;
-               bs_from->s = NULL;
-               close_stream((stream*) bs_to);
-               close_stream((stream*) bs_from);
+
+               // FIXME: this leaks
+//             bs_to->s = NULL;
+//             bs_from->s = NULL;
+//             close_stream((stream*) bs_to);
+//             close_stream((stream*) bs_from);
 
        }
 
diff --git a/common/stream/Makefile.ag b/common/stream/Makefile.ag
--- a/common/stream/Makefile.ag
+++ b/common/stream/Makefile.ag
@@ -10,6 +10,7 @@ MTSAFE
 
 INCLUDES = $(zlib_CFLAGS) \
                   $(BZ_CFLAGS) \
+                  $(snappy_CFLAGS) \
                   $(liblzma_CFLAGS) \
                   $(openssl_CFLAGS) \
                   $(curl_CFLAGS)
@@ -20,6 +21,7 @@ lib_stream  =  {
        LIBS = $(SOCKET_LIBS) \
                   $(zlib_LIBS) \
                   $(BZ_LIBS) \
+                  $(snappy_LIBS) \
                   $(liblzma_LIBS) \
                   $(openssl_LIBS) \
                   $(curl_LIBS) \
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -3567,6 +3567,8 @@ buffer_wastream(buffer *b, const char *n
        return s;
 }
 
+
+
 /* ------------------------------------------------------------------ */
 
 /* A buffered stream consists of a sequence of blocks.  Each block
@@ -3885,6 +3887,20 @@ bs_isalive(stream *ss)
 }
 
 static void
+bs_close(stream *ss)
+{
+       bs *s;
+
+       s = (bs *) ss->stream_data.p;
+       assert(s);
+       if (s == NULL)
+               return;
+       assert(s->s);
+       if (s->s)
+               s->s->close(s->s);
+}
+
+static void
 bs_destroy(stream *ss)
 {
        bs *s;
@@ -3900,24 +3916,6 @@ bs_destroy(stream *ss)
        destroy(ss);
 }
 
-
-
-static void
-bs_close(stream *ss)
-{
-       bs *s;
-
-       s = (bs *) ss->stream_data.p;
-       assert(s);
-       if (s == NULL)
-               return;
-       assert(s->s);
-       if (s->s)
-               s->s->close(s->s);
-       bs_destroy(ss);
-}
-
-
 static void
 bs_clrerr(stream *s)
 {
@@ -3925,6 +3923,10 @@ bs_clrerr(stream *s)
                mnstr_clearerr(((bs *) s->stream_data.p)->s);
 }
 
+stream* bs_stream(stream *s) {
+       assert(isa_block_stream(s));
+       return ((bs*)s->stream_data.p)->s;
+}
 
 // FIXME: patch bs_read/bs_write etc
 // 10 MB max buffer size or so
@@ -3966,208 +3968,308 @@ block_stream(stream *s)
        return ns;
 }
 
+
+typedef struct bs2 {
+       stream *s;              /* underlying stream */
+       size_t nr;              /* how far we got in buf */
+       size_t itotal;  /* amount available in current read block */
+       size_t bufsiz;
+       char buf[0];    /* the buffered data (minus the size of
+                                * size-short */
+
+} bs2;
+
+static bs2 *
+bs2_create(stream *s, size_t bufsiz)
+{
+       /* should be a binary stream */
+       bs2 *ns;
+
+       if ((ns = malloc(sizeof(*ns) + bufsiz)) == NULL)
+               return NULL;
+       ns->s = s;
+       ns->nr = 0;
+       ns->itotal = 0;
+       ns->bufsiz = bufsiz;
+       return ns;
+}
+
+/* Collect data until the internal buffer is filled, then write the
+ * filled buffer to the underlying stream.
+ * Struct field usage:
+ * s - the underlying stream;
+ * buf - the buffer in which data is collected;
+ * nr - how much of buf is already filled (if nr == sizeof(buf) the
+ *      data is written to the underlying stream, so upon entry nr <
+ *      sizeof(buf));
+ * itotal - unused.
+ */
+static ssize_t
+bs2_write(stream *ss, const void *buf, size_t elmsize, size_t cnt)
+{
+       bs2 *s;
+       size_t todo = cnt * elmsize;
+       lng blksize;
+
+       s = (bs2 *) ss->stream_data.p;
+       if (s == NULL)
+               return -1;
+       assert(ss->access == ST_WRITE);
+       assert(s->nr < s->bufsiz);
+       while (todo > 0) {
+               size_t n = s->bufsiz - s->nr;
+
+               if (todo < n)
+                       n = todo;
+               memcpy(s->buf + s->nr, buf, n);
+               s->nr += n;
+               todo -= n;
+               buf = ((const char *) buf + n);
+               if (s->nr == s->bufsiz) {
+
+#ifdef BSTREAM_DEBUG
+                       {
+                               size_t i;
+
+                               fprintf(stderr, "W %s %lu \"", ss->name, s->nr);
+                               for (i = 0; i < s->nr; i++)
+                                       if (' ' <= s->buf[i] && s->buf[i] < 127)
+                                               putc(s->buf[i], stderr);
+                                       else
+                                               fprintf(stderr, "\\%03o", 
s->buf[i]);
+                               fprintf(stderr, "\"\n");
+                       }
+#endif
+
+
+                       /* block is full, write it to the stream */
+
+                       /* since the block is at max BLOCK (8K) - 2 size we can
+                        * store it in a two byte integer */
+                       blksize = s->nr;
+                       /* the last bit tells whether a flush is in there, it's 
not
+                        * at this moment, so shift it to the left */
+                       blksize <<= 1;
+
+                       if (!mnstr_writeLng(s->s, blksize) || s->s->write(s->s, 
s->buf, 1, s->nr) != (ssize_t) s->nr) {
+                               ss->errnr = MNSTR_WRITE_ERROR;
+                               return -1;
+                       }
+                       s->nr = 0;
+               }
+       }
+       return (ssize_t) cnt;
+}
+
+/* If the internal buffer is partially filled, write it to the
+ * underlying stream.  Then in any case write an empty buffer to the
+ * underlying stream to indicate to the receiver that the data was
+ * flushed.
+ */
+static int
+bs2_flush(stream *ss)
+{
+       lng blksize;
+       bs2 *s;
+
+       s = (bs2 *) ss->stream_data.p;
+       if (s == NULL)
+               return -1;
+       assert(ss->access == ST_WRITE);
+       assert(s->nr < s->bufsiz);
+       if (ss->access == ST_WRITE) {
+               /* flush the rest of buffer (if s->nr > 0), then set the
+                * last bit to 1 to to indicate user-instigated flush */
+#ifdef BSTREAM_DEBUG
+               if (s->nr > 0) {
+                       size_t i;
+
+                       fprintf(stderr, "W %s %lu \"", ss->name, s->nr);
+                       for (i = 0; i < s->nr; i++)
+                               if (' ' <= s->buf[i] && s->buf[i] < 127)
+                                       putc(s->buf[i], stderr);
+                               else
+                                       fprintf(stderr, "\\%03o", s->buf[i]);
+                       fprintf(stderr, "\"\n");
+                       fprintf(stderr, "W %s 0\n", ss->name);
+               }
+#endif
+               blksize = ((lng) s->nr) << 1;
+               /* indicate that this is the last buffer of a block by
+                * setting the low-order bit */
+               blksize |= 1;
+               /* always flush (even empty blocks) needed for the protocol) */
+
+               if ((!mnstr_writeLng(s->s, blksize) ||
+                    (s->nr > 0 &&
+                     s->s->write(s->s, s->buf, 1, s->nr) != (ssize_t) s->nr))) 
{
+                       ss->errnr = MNSTR_WRITE_ERROR;
+                       return -1;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to