Changeset: b60c634a07cd for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b60c634a07cd
Modified Files:
        clients/mapilib/mapi.c
        common/stream/stream.c
        common/stream/stream.h
Branch: protocol
Log Message:

Additional changes to byte_stream.


diffs (truncated from 483 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2781,15 +2781,13 @@ mapi_reconnect(Mapi mid)
        check_stream(mid, mid->to, "Could not send initial byte sequence", 
"mapi_reconnect", mid->error);
 
        if (prot_version == prot10 || prot_version == prot10compressed) {
+               bstream *bs_to = (bstream*) mid->to;
+               bstream *bs_from = (bstream*) mid->from;
+
                printf("Using protocol version %s.\n", prot_version == prot10  
? "PROT10" : "PROT10COMPRESSED");
-               // FIXME: destroy block streams and replace with appropriate 
streams
-#if 0
                assert(isa_block_stream(mid->to));
                assert(isa_block_stream(mid->from));
 
-               bs *bs_to = (bs*) mid->to;
-               bs *bs_from = (bs*) mid->from;
-
                if (prot_version == prot10compressed) {
 #ifdef HAVE_LIBSNAPPY
                        mid->to = compressed_stream(bs_to->s, 
COMPRESSION_SNAPPY);
@@ -2798,13 +2796,16 @@ mapi_reconnect(Mapi mid)
                        assert(0);
 #endif
                } else {
-                       mid->to = byte_stream((bs_to->s);
-                       mid->from = byte_stream(bs_from->s);
+                       // FIXME: figure out proper stream sizes
+                       mid->to = byte_stream(bs_to->s, 1024000);
+                       mid->from = byte_stream(bs_from->s, 1024000);
+
                }
-
-               close_stream(bs_to);
-               close_stream(bs_from);
-#endif
+               bs_to->s = NULL;
+               bs_from->s = NULL;
+               close_stream((stream*) bs_to);
+               close_stream((stream*) bs_from);
+
        }
 
        /* consume the welcome message from the server */
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -3885,20 +3885,6 @@ bs_isalive(stream *ss)
 }
 
 static void
-bs_close(stream *ss)
-{
-       bs *s;
-
-       s = (bs *) ss->stream_data.p;
-       assert(s);
-       if (s == NULL)
-               return;
-       assert(s->s);
-       if (s->s)
-               s->s->close(s->s);
-}
-
-static void
 bs_destroy(stream *ss)
 {
        bs *s;
@@ -3914,6 +3900,24 @@ bs_destroy(stream *ss)
        destroy(ss);
 }
 
+
+
+static void
+bs_close(stream *ss)
+{
+       bs *s;
+
+       s = (bs *) ss->stream_data.p;
+       assert(s);
+       if (s == NULL)
+               return;
+       assert(s->s);
+       if (s->s)
+               s->s->close(s->s);
+       bs_destroy(ss);
+}
+
+
 static void
 bs_clrerr(stream *s)
 {
@@ -3984,8 +3988,9 @@ bytestream_create(stream *s, size_t bufs
                return NULL;
        ns->s = s;
        ns->bufsize = bufsize;
-       ns->bufpos = 0;
+       ns->bufpos = BYTESTREAM_OVERHEAD;
        ns->buf = malloc(bufsize);
+       ns->bufend = ns->bufpos;
        if (ns->buf == NULL) {
                free(ns);
                return NULL;
@@ -4003,20 +4008,17 @@ bytestream_flush(stream *ss)
                return -1;
        assert(ss->access == ST_WRITE);
        assert(s->bufpos < s->bufsize);
-       if (ss->access == ST_WRITE) {
-               if (s->bufpos + BYTESTREAM_OVERHEAD <= s->bufsize) {
-                       // if there is room in the buffer, end the buffer with 
a 0 so the reader knows that the stream has ended
-                       lng data = 0;
-                       memcpy(s->buf + s->bufpos, &data, sizeof(lng));
-                       s->bufpos += sizeof(lng);
-               }
-               // FIXME: for compressed stream we can compress the buffer here 
before writing it
-               if (!s->s->write(s->s, s->buf, 1, s->bufpos)) {
-                       ss->errnr = MNSTR_WRITE_ERROR;
-                       return -1;
-               }
-               s->bufpos = 0;
+
+       memcpy(s->buf, &s->bufpos, sizeof(size_t));
+
+
+       // FIXME: for compressed stream we can compress the buffer here before 
writing it
+       if (!s->s->write(s->s, s->buf, 1, s->bufpos)) {
+               ss->errnr = MNSTR_WRITE_ERROR;
+               return -1;
        }
+       s->bufpos = BYTESTREAM_OVERHEAD;
+
        return 0;
 }
 
@@ -4031,227 +4033,125 @@ bytestream_write(stream *ss, const void 
                return -1;
        assert(ss->access == ST_WRITE);
        assert(s->bufpos < s->bufsize);
-       if (todo + BYTESTREAM_OVERHEAD > s->bufsize)
+       if (todo > s->bufsize)
                fprintf(stderr, "Content too big for buffer!\n");
                ss->errnr = MNSTR_WRITE_ERROR;
                return -1; // content does not fit into buffer
 
-       if (todo + BYTESTREAM_OVERHEAD > s->bufsize - s->bufpos) {
+       if (todo > s->bufsize - s->bufpos) {
                // content does not fit into buffer currently, but will if we 
flush it first
-               bytestream_flush(s);
+               bytestream_flush(ss);
        }
 
-
-       // write the length of the package
-       memcpy(s->buf + s->bufpos, &todo, sizeof(size_t));
-       s->bufpos += sizeof(size_t);
        // write the actual data into the buffer
        memcpy(s->buf + s->bufpos, buf, todo);
        s->bufpos += todo;
 
-       if (s->bufpos + BYTESTREAM_OVERHEAD >= s->bufsize) {
+       if (s->bufpos >= s->bufsize) {
                // we cannot fit any more packages in here
-               bytestream_flush(s);
+               bytestream_flush(ss);
        }
        return (ssize_t) cnt;
 }
 
 
 
-static ssize_t
+ssize_t
 bytestream_read(stream *ss, void *buf, size_t elmsize, size_t cnt)
 {
        bytestream *s;
        size_t todo = cnt * elmsize;
-       size_t n;
-
-       s = (bs *) ss->stream_data.p;
+       lng size;
+
+       s = (bytestream *) ss->stream_data.p;
        if (s == NULL)
                return -1;
 
-       // FIXME: reading
-
-//     assert(ss->access == ST_READ);
-
-//     lng size = mnstr_readLng(ss, &size);
-//     s->s->read(s->s, buf, 1, size);
-//     if (s->itotal == 0) {
-//             short blksize = 0;
-
-//             if (s->nr) {
-//                     /* We read the closing block but hadn't
-//                      * returned that yet. Return it now, and note
-//                      * that we did by setting s->nr to 0. */
-//                     assert(s->nr == 1);
-//                     s->nr = 0;
-//                     return 0;
-//             }
-
-//             assert(s->nr == 0);
-
-//             /* There is nothing more to read in the current block,
-//              * so read the count for the next block */
-//             switch (mnstr_readSht(s->s, &blksize)) {
-//             case -1:
-//                     ss->errnr = s->s->errnr;
-//                     return -1;
-//             case 0:
-//                     return 0;
-//             case 1:
-//                     break;
-//             }
-//             if (blksize < 0) {
-//                     ss->errnr = MNSTR_READ_ERROR;
-//                     return -1;
-//             }
-// #ifdef BSTREAM_DEBUG
-//             fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 1, 
blksize & 1 ? "true" : "false");
-//             fprintf(stderr, "RC %s %d\n", ss->name, blksize);
-// #endif
-//             s->itotal = (unsigned) (blksize >> 1);  /* amount readable */
-//             /* store whether this was the last block or not */
-//             s->nr = blksize & 1;
-//             s->bytes += s->itotal;
-//             s->blks++;
-//     }
-
-//     /* Fill the caller's buffer. */
-//     cnt = 0;                /* count how much we put into the buffer */
-//     while (todo > 0) {
-//             /* there is more data waiting in the current block, so
-//              * read it */
-//             n = todo < s->itotal ? todo : s->itotal;
-//             while (n > 0) {
-//                     ssize_t m = s->s->read(s->s, buf, 1, n);
-
-//                     if (m <= 0) {
-//                             ss->errnr = s->s->errnr;
-//                             return -1;
-//                     }
-// #ifdef BSTREAM_DEBUG
-//                     {
-//                             ssize_t i;
-
-//                             fprintf(stderr, "RD %s %zd \"", ss->name, m);
-//                             for (i = 0; i < m; i++)
-//                                     if (' ' <= ((char *) buf)[i] && ((char 
*) buf)[i] < 127)
-//                                             putc(((char *) buf)[i], stderr);
-//                                     else
-//                                             fprintf(stderr, "\\%03o", 
((char *) buf)[i]);
-//                             fprintf(stderr, "\"\n");
-//                     }
-// #endif
-//                     buf = (void *) ((char *) buf + m);
-//                     cnt += m;
-//                     n -= m;
-//                     s->itotal -= (int) m;
-//                     todo -= m;
-//             }
-
-//             if (s->itotal == 0) {
-//                     short blksize = 0;
-
-//                     /* The current block has been completely read,
-//                      * so read the count for the next block, only
-//                      * if the previous was not the last one */
-//                     if (s->nr)
-//                             break;
-//                     switch (mnstr_readSht(s->s, &blksize)) {
-//                     case -1:
-//                             ss->errnr = s->s->errnr;
-//                             return -1;
-//                     case 0:
-//                             return 0;
-//                     case 1:
-//                             break;
-//                     }
-//                     if (blksize < 0) {
-//                             ss->errnr = MNSTR_READ_ERROR;
-//                             return -1;
-//                     }
-// #ifdef BSTREAM_DEBUG
-//                     fprintf(stderr, "RC size: %d, final: %s\n", blksize >> 
1, blksize & 1 ? "true" : "false");
-//                     fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
-//                     fprintf(stderr, "RC %s %d\n", ss->name, blksize);
-// #endif
-//                     s->itotal = (unsigned) (blksize >> 1);  /* amount 
readable */
-//                     /* store whether this was the last block or not */
-//                     s->nr = blksize & 1;
-//                     s->bytes += s->itotal;
-//                     s->blks++;
-//             }
-//     }
-//     /* if we got an empty block with the end-of-sequence marker
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to