Changeset: 721c8b3d945f for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=721c8b3d945f
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        clients/mapilib/mapi.h
        common/stream/stream.c
        common/stream/stream.h
        monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:

mclient support for new block stream, compression working


diffs (truncated from 615 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2932,6 +2932,10 @@ usage(const char *prog, int xit)
        fprintf(stderr, " -E charset  | --encoding=charset specify encoding 
(character set) of the terminal\n");
 #endif
        fprintf(stderr, " -f kind     | --format=kind      specify output 
format {csv,tab,raw,sql,xml}\n");
+
+       fprintf(stderr, " -P version  | --protocol=version specify protocol 
version {prot9,prot10,prot10compressed}\n");
+       fprintf(stderr, " -B size     | --blocksize=size   specify protocol 
block size (>= %d)\n", BLOCK);
+
        fprintf(stderr, " -H          | --history          load/save cmdline 
history (default off)\n");
        fprintf(stderr, " -i          | --interactive[=tm] interpret `\\' 
commands on stdin, use time formatting {ms,s,m}\n");
        fprintf(stderr, " -l language | --language=lang    {sql,mal}\n");
@@ -2969,6 +2973,8 @@ main(int argc, char **argv)
        char *command = NULL;
        char *dbname = NULL;
        char *output = NULL;    /* output format as string */
+       char *protocol = NULL;
+       size_t blocksize = 0;
        FILE *fp = NULL;
        int trace = 0;
        int dump = 0;
@@ -2992,6 +2998,9 @@ main(int argc, char **argv)
                {"encoding", 1, 0, 'E'},
 #endif
                {"format", 1, 0, 'f'},
+               {"protocol", 1, 0, 'P'},
+               {"blocksize", 1, 0, 'B'},
+
                {"help", 0, 0, '?'},
                {"history", 0, 0, 'H'},
                {"host", 1, 0, 'h'},
@@ -3125,6 +3134,16 @@ main(int argc, char **argv)
                                free(output);
                        output = strdup(optarg);        /* output format */
                        break;
+               case 'P':
+                       assert(optarg);
+                       if (protocol != NULL)
+                               free(protocol);
+                       protocol = strdup(optarg);
+                       break;
+               case 'B':
+                       assert(optarg);
+                       blocksize = (size_t) atol(optarg);
+                       break;
                case 'i':
                        interactive = 1;
                        showtiming = 1;
@@ -3271,6 +3290,30 @@ main(int argc, char **argv)
        if (passwd)
                free(passwd);
        passwd = NULL;
+
+       if (blocksize > 0) {
+               if (blocksize < BLOCK) {
+                       fprintf(stderr, "invalid block size (needs to be bigger 
than %d)\n", BLOCK);
+               } else {
+                       mapi_set_blocksize(mid, blocksize);
+               }
+       }
+
+       if (protocol) {
+               if (strcasecmp(protocol, "prot9") == 0) {
+                       mapi_set_protocol(mid, prot9);
+               }
+               else if (strcasecmp(protocol, "prot10") == 0) {
+                       mapi_set_protocol(mid, prot10);
+               }
+               else if (strcasecmp(protocol, "prot10compressed") == 0) {
+                       mapi_set_protocol(mid, prot10compressed);
+               }
+               else {
+                       fprintf(stderr, "invalid protocol name '%s'\n", 
protocol);
+               }
+       }
+
        if (mid && mapi_error(mid) == MOK)
                mapi_reconnect(mid);    /* actually, initial connect */
 
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -881,6 +881,7 @@ struct BlockCache {
        int eos;                /* end of sequence */
 };
 
+
 /* A connection to a server is represented by a struct MapiStruct.  An
    application can have any number of connections to any number of
    servers.  Connections are completely independent of each other.
@@ -897,6 +898,8 @@ struct MapiStruct {
        char *uri;
        int languageId;
        char *motd;             /* welcome message from server */
+       protocol_version protocol;
+       size_t blocksize;
 
        int trace;              /* Trace Mapi interaction */
        int auto_commit;
@@ -1884,6 +1887,9 @@ mapi_new(void)
        mid->username = NULL;
        mid->password = NULL;
 
+       mid->protocol = protauto;
+       mid->blocksize = 128*BLOCK; // 1 MB
+
        mid->cachelimit = 100;
        mid->redircnt = 0;
        mid->redirmax = 10;
@@ -2193,11 +2199,6 @@ mapi_destroy(Mapi mid)
        return MOK;
 }
 
-typedef enum {
-       prot9 = 1,
-       prot10 = 2,
-       prot10compressed = 3,
-} protocol_version;
 
 /* (Re-)establish a connection with the server. */
 MapiMsg
@@ -2214,8 +2215,6 @@ mapi_reconnect(Mapi mid)
        char *protover;
        char *rest;
        protocol_version prot_version = prot9;
-       // FIXME: make this configurable
-       size_t block_size = 1024000;
 
        if (mid->connected)
                close_connection(mid);
@@ -2631,17 +2630,37 @@ mapi_reconnect(Mapi mid)
                }
 
 #ifdef HAVE_LIBSNAPPY
-               if (strstr(hashes, "PROT10COMPRESSED")) {
+               if (strstr(hashes, "PROT10COMPR")) {
                        // both server and client support compressed protocol 
10; use compressed version 
-                       prot_version = prot10compressed;
+                       if (mid->protocol == protauto) {
+                               prot_version = prot10compressed;
+                       } else {
+                               prot_version = mid->protocol;
+                       }
                } else
 #endif
                if (strstr(hashes, "PROT10")) {
                        // both server and client support protocol 10; use 
protocol 10
-                       prot_version = prot10;
+                       if (mid->protocol == protauto) {
+                               prot_version = prot10;
+                       } else {
+                               if (mid->protocol == prot10compressed) {
+                                       mapi_setError(mid, "Either client or 
server do not support protocol compression", "mapi_reconnect", MERROR);
+                                       close_connection(mid);
+                                       return mid->error;
+                               } else {
+                                       prot_version = mid->protocol;
+                               }
+                       }
                } else {
                        // connecting to old server; use protocol 9
-                       prot_version = prot9;
+                       if (mid->protocol == prot9 || mid->protocol == 
protauto) {
+                               prot_version = prot9;
+                       } else {
+                               mapi_setError(mid, "Either client or server do 
not support protocol compression", "mapi_reconnect", MERROR);
+                               close_connection(mid);
+                               return mid->error;
+                       }
                }
 
                /* in rest now should be the byte order of the server */
@@ -2742,8 +2761,8 @@ mapi_reconnect(Mapi mid)
        #endif
                                     mid->username, hash, mid->language,
                                     mid->database == NULL ? "" : mid->database,
-                                    prot_version == prot10 ? "PROT10" : 
"PROT10COMPRESSED",
-                                   block_size);
+                                    prot_version == prot10 ? "PROT10" : 
"PROT10COMPR",
+                                   mid->blocksize);
                        } else {
                                retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:\n",
        #ifdef WORDS_BIGENDIAN
@@ -2785,29 +2804,23 @@ mapi_reconnect(Mapi mid)
 
        if (prot_version == prot10 || prot_version == prot10compressed) {
 
-               printf("Using protocol version %s.\n", prot_version == prot10  
? "PROT10" : "PROT10COMPRESSED");
+               printf("Using protocol version %s.\n", prot_version == prot10  
? "PROT10" : "PROT10COMPR");
                assert(isa_block_stream(mid->to));
                assert(isa_block_stream(mid->from));
 
                if (prot_version == prot10compressed) {
-#ifdef HAVE_LIBSNAPPY2
-                       mid->to = compressed_stream(bs_to->s, 
COMPRESSION_SNAPPY);
-                       mid->from = compressed_stream(bs_from->s, 
COMPRESSION_SNAPPY);
+#ifdef HAVE_LIBSNAPPY
+                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_SNAPPY);
+                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_SNAPPY);
 #else
                        assert(0);
 #endif
                } else {
-                       // FIXME: figure out proper stream sizes
-                       mid->to = block_stream2(bs_stream(mid->to), block_size);
-                       mid->from = block_stream2(bs_stream(mid->from), 
block_size);
+                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_NONE);
+                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_NONE);
                }
 
-               // FIXME: this leaks
-//             bs_to->s = NULL;
-//             bs_from->s = NULL;
-//             close_stream((stream*) bs_to);
-//             close_stream((stream*) bs_from);
-
+               // FIXME: this leaks a block stream header
        }
 
        /* consume the welcome message from the server */
@@ -5527,3 +5540,15 @@ mapi_get_active(Mapi mid)
        return mid->active;
 }
 
+void mapi_set_protocol(Mapi mid, protocol_version prot) {
+       mid->protocol = prot;
+}
+
+void mapi_set_blocksize(Mapi mid, size_t blocksize) {
+       if (blocksize >= BLOCK) {
+               mid->blocksize = blocksize;
+       }
+}
+
+
+
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -129,6 +129,13 @@ typedef struct {           /* used by MAPI_DATETI
        unsigned int fraction;  /* in 1000 millionths of a second (10e-9) */
 } MapiDateTime;
 
+typedef enum {
+       protauto = 0,
+       prot9 = 1,
+       prot10 = 2,
+       prot10compressed = 3,
+} protocol_version;
+
 /* connection-oriented functions */
 mapi_export Mapi mapi_mapi(const char *host, int port, const char *username, 
const char *password, const char *lang, const char *dbname);
 mapi_export Mapi mapi_mapiuri(const char *url, const char *user, const char 
*pass, const char *lang);
@@ -232,6 +239,10 @@ mapi_export int mapi_get_tableid(MapiHdl
 mapi_export char *mapi_quote(const char *msg, int size);
 mapi_export char *mapi_unquote(char *msg);
 mapi_export MapiHdl mapi_get_active(Mapi mid);
+
+mapi_export void mapi_set_protocol(Mapi mid, protocol_version prot);
+mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize);
+
 #ifdef _MSC_VER
 mapi_export const char *wsaerror(int);
 #endif
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -93,6 +93,9 @@
 #ifdef HAVE_LIBLZMA
 #include <lzma.h>
 #endif
+#ifdef HAVE_LIBSNAPPY
+#include <snappy-c.h> // C forever
+#endif
 
 #ifdef HAVE_ICONV
 #ifdef HAVE_ICONV_H
@@ -3974,13 +3977,15 @@ typedef struct bs2 {
        size_t nr;              /* how far we got in buf */
        size_t itotal;  /* amount available in current read block */
        size_t bufsiz;
+       compression_method comp;
+       char *compbuf;
+       size_t compbufsiz;
        char buf[0];    /* the buffered data (minus the size of
                                 * size-short */
-
 } bs2;
 
 static bs2 *
-bs2_create(stream *s, size_t bufsiz)
+bs2_create(stream *s, size_t bufsiz, compression_method comp)
 {
        /* should be a binary stream */
        bs2 *ns;
@@ -3991,6 +3996,21 @@ bs2_create(stream *s, size_t bufsiz)
        ns->nr = 0;
        ns->itotal = 0;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to