Changeset: 5210c24ef644 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5210c24ef644
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/Makefile.ag
        clients/mapilib/mapi.c
        common/stream/mhapi.proto
        common/stream/stream.c
        common/stream/stream.h
        monetdb5/modules/mal/mal_mapi.c
        sql/backends/monet5/Makefile.ag
        sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:

protobuf result sets first part


diffs (truncated from 362 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2957,7 +2957,7 @@ usage(const char *prog, int xit)
        fprintf(stderr, " -C version  | --compression=type specify compression 
method {snappy,lz4}\n");
        fprintf(stderr, " -P version  | --protocol=version specify protocol 
version {prot9,prot10,prot10compressed}\n");
        fprintf(stderr, " -B size     | --blocksize=size   specify protocol 
block size (>= %d)\n", BLOCK);
-       fprintf(stderr, " -c colcomp  | --colcomp=type     specify column 
compression type {none,pfor}");
+       fprintf(stderr, " -c colcomp  | --colcomp=type     specify column 
compression type {none,pfor,protobuf}");
 
        fprintf(stderr, " -H          | --history          load/save cmdline 
history (default off)\n");
        fprintf(stderr, " -i          | --interactive[=tm] interpret `\\' 
commands on stdin, use time formatting {ms,s,m}\n");
diff --git a/clients/mapilib/Makefile.ag b/clients/mapilib/Makefile.ag
--- a/clients/mapilib/Makefile.ag
+++ b/clients/mapilib/Makefile.ag
@@ -13,8 +13,9 @@ lib_mapi = {
        VERSION = $(MAPI_VERSION)
        SOURCES = mapi.c mapi.rc
        LIBS = $(SOCKET_LIBS) ../../common/stream/libstream \
+               ../../common/stream/libstream_protobuf \
                ../../common/options/libmoptions \
-               ../../common/utils/libmcrypt $(openssl_LIBS) $(pfor_LIBS)
+               ../../common/utils/libmcrypt $(openssl_LIBS) $(pfor_LIBS) 
$(protobuf_LIBS)
 }
 
 headers_mapi = {
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -913,6 +913,7 @@ struct MapiStruct {
        compression_method comp;
        column_compression colcomp;
        size_t blocksize;
+       void* protobuf_res;
 
        int trace;              /* Trace Mapi interaction */
        int auto_commit;
@@ -952,6 +953,8 @@ struct MapiResultSet {
        struct MapiRowBuf cache;
        int commentonly;        /* only comments seen so far */
        mapi_int64 rows_read;
+       mapi_int64 cur_row;
+
 };
 
 struct MapiStatement {
@@ -1905,6 +1908,7 @@ mapi_new(void)
        mid->protocol = protauto;
        mid->colcomp = COLUMN_COMPRESSION_AUTO;
        mid->blocksize = 128 * BLOCK; // 1 MB
+       mid->protobuf_res = NULL;
 
        mid->cachelimit = 100;
        mid->redircnt = 0;
@@ -2796,11 +2800,7 @@ mapi_reconnect(Mapi mid)
                                     mid->database == NULL ? "" : mid->database,
                                     prot_version == prot10 ? "PROT10" : 
"PROT10COMPR",
                                     comp == COMPRESSION_SNAPPY ? "SNAPPY" : 
(comp == COMPRESSION_LZ4 ? "LZ4" : ""),
-#ifdef HAVE_PFOR
-                                    mid->colcomp == COLUMN_COMPRESSION_PFOR ? 
",HAVEPFOR" : "",
-#else
-                                    "",
-#endif
+                                    mid->colcomp == COLUMN_COMPRESSION_PFOR ? 
",HAVEPFOR" :    (mid->colcomp == COLUMN_COMPRESSION_PROTOBUF ? ",PROTOBUF" : 
""),
                                    mid->blocksize);
                        } else {
                                retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:\n",
@@ -4236,7 +4236,7 @@ read_into_cache(MapiHdl hdl, int lookahe
                        result->querytype = Q_TABLE;
                        result->tuple_count = 0;
                        result->rows_read = 0;
-
+                       result->cur_row = 0;
 
                        for (i = 0; i < nr_cols; i++) {
                                lng col_info_length;
@@ -5533,6 +5533,10 @@ mapi_split_line(MapiHdl hdl)
        return n;
 }
 
+#ifdef HAVE_LIBPROTOBUF
+#include <mhapi.pb-c.h>
+#endif
+
 int
 mapi_fetch_row(MapiHdl hdl)
 {
@@ -5558,6 +5562,8 @@ mapi_fetch_row(MapiHdl hdl)
                if (result->rows_read >= result->tuple_count) {
                        // if our cache is empty, we read data from the socket
                        lng nrows = 0;
+                       result->cur_row = 1;
+
                        // first we write a prompt to the server indicating 
that we want another block of the result set
 
 #ifdef CONTINUATION_MESSAGE
@@ -5569,8 +5575,39 @@ mapi_fetch_row(MapiHdl hdl)
                        }
 #endif
 
+
+                       if (hdl->mid->colcomp == COLUMN_COMPRESSION_PROTOBUF) {
+                               buffer buf = bs2_buffer(hdl->mid->from);
+#ifndef HAVE_LIBPROTOBUF
+                               // TODO: complain
+#else
+                               Mhapi__QueryResult *res = 
mhapi__query_result__unpack(NULL, buf.pos, (const uint8_t *) buf.buf);
+                               assert(res->row_count <= result->row_count);
+                               assert(res->n_columns == (size_t) 
result->fieldcnt);
+
+                               for (i = 0; i < (size_t) result->fieldcnt; i++) 
{
+                                       Mhapi__QueryResult__Column *c = 
res->columns[i];
+                                       if (c->n_double_values > 0) {
+                                               result->fields[i].buffer_ptr = 
(char*) c->double_values;
+                                       } else if (c->n_int32_values > 0) {
+                                               result->fields[i].buffer_ptr =  
(char*) c->int32_values;
+                                       } else if (c->n_int64_values > 0) {
+                                               result->fields[i].buffer_ptr =  
(char*) c->int64_values;
+                                       }else if (c->n_string_values > 0) {
+                                               result->fields[i].buffer_ptr = 
(char*)  c->string_values[0];
+                                       }
+                               }
+                               result->tuple_count += res->row_count;
+                               result->rows_read++;
+                               if(hdl->mid->protobuf_res) {
+                                       free(hdl->mid->protobuf_res);
+                               }
+                               hdl->mid->protobuf_res = (void*) res;
+                               return result->fieldcnt;
+#endif
+                       }
+
                        bs2_resetbuf(hdl->mid->from); // kinda a bit evil
-                       assert(bs2_buffer(hdl->mid->from).pos == 0);
 
                        // this actually triggers the read of the entire block
                        // after this point we operate on the buffer
@@ -5622,12 +5659,18 @@ mapi_fetch_row(MapiHdl hdl)
                        for (i = 0; i < (size_t) result->fieldcnt; i++) {
                                if (result->fields[i].columnlength < 0) {
                                        // variable-length column
-                                       result->fields[i].buffer_ptr += 
strlen(result->fields[i].buffer_ptr) + 1;
+
+                                       if (hdl->mid->protobuf_res) {
+                                               result->fields[i].buffer_ptr = 
((Mhapi__QueryResult*) 
hdl->mid->protobuf_res)->columns[i]->string_values[result->cur_row];
+                                       } else {
+                                               result->fields[i].buffer_ptr += 
strlen(result->fields[i].buffer_ptr) + 1;
+                                       }
                                } else {
                                        result->fields[i].buffer_ptr += 
result->fields[i].columnlength;
                                }
                        }
                }
+               result->cur_row++;
                result->rows_read++;
                return result->fieldcnt;
        }
@@ -6038,6 +6081,8 @@ mapi_set_column_compression(Mapi mid, co
        }
        else if (strcasecmp(colcomp, "none") == 0) {
                mid->colcomp = COLUMN_COMPRESSION_NONE;
+       } else if (strcasecmp(colcomp, "protobuf") == 0) {
+               mid->colcomp = COLUMN_COMPRESSION_PROTOBUF;
        } else {
                mapi_setError(mid, "invalid column compression type", 
"mapi_set_compression", MERROR);
                return -1;
diff --git a/common/stream/mhapi.proto b/common/stream/mhapi.proto
--- a/common/stream/mhapi.proto
+++ b/common/stream/mhapi.proto
@@ -1,33 +1,34 @@
 package mhapi;
 
 message QueryResult {
-       required int32 result_id = 1;
+//     required int32 result_id = 1;
        required int64 row_count = 2;
-       required int64 col_count = 3;
+//     required int64 col_count = 3;
 
        message Column {
-               required string table_name = 1;
-               required string column_name = 2;
-               required string type_name = 3;
+//             required string table_name = 1;
+//             required string column_name = 2;
+//             required string type_name = 3;
 
                repeated string string_values = 4;
                repeated int64  int64_values = 5 [packed=true];
-               repeated int32  int32_value = 6 [packed=true];
+               repeated int32  int32_values = 6 [packed=true];
                repeated double double_values = 7 [packed=true];
        }
 
 
        message ColumnUnpacked {
-               required string table_name = 1;
-               required string column_name = 2;
-               required string type_name = 3;
+//             required string table_name = 1;
+//             required string column_name = 2;
+//             required string type_name = 3;
 
                repeated string string_values = 4;
-               repeated int64  int64_values = 5;
-               repeated int32  int32_value = 6;
-               repeated double double_values = 7;
+               repeated int64  int64_values = 5 [packed=false];
+               repeated int32  int32_values = 6 [packed=false];
+               repeated double double_values = 7 [packed=false];
        }
 
        repeated Column columns = 4;
        repeated ColumnUnpacked columns_unpacked = 5;
 }
+
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4461,6 +4461,12 @@ bs2_buffer(stream *ss) {
        return b;
 }
 
+void bs2_setpos(stream *ss, size_t pos) {
+       bs2 *s = (bs2 *) ss->stream_data.p;
+       assert(pos < s->bufsiz);
+       s->nr = pos;
+}
+
 column_compression
 bs2_colcomp(stream *ss) {
        bs2 *s = (bs2 *) ss->stream_data.p;
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -261,6 +261,8 @@ stream_export void* bs2_getbuf(stream *s
 stream_export void bs2_resetbuf(stream *ss);
 stream_export buffer bs2_buffer(stream *s);
 column_compression bs2_colcomp(stream *ss);
+stream_export void bs2_setpos(stream *ss, size_t pos);
+
 
 /* read block of data including the end of block marker */
 stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize, 
size_t cnt);
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -190,6 +190,9 @@ doChallenge(void *data)
                if (strstr(buf, "PFOR")) {
                        colcomp = COLUMN_COMPRESSION_PFOR;
                }
+               if (strstr(buf, "PROTOBUF")) {
+                               colcomp = COLUMN_COMPRESSION_PROTOBUF;
+                       }
 
                // FIXME: this leaks a block stream header
                if (buflen < BLOCK) {
diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag
--- a/sql/backends/monet5/Makefile.ag
+++ b/sql/backends/monet5/Makefile.ag
@@ -53,8 +53,10 @@ lib__sql = {
                   ../../../monetdb5/tools/libmonetdb5 \
                   ../../../gdk/libbat \
                   ../../../common/stream/libstream \
+                  ../../../common/stream/libstream_protobuf \
                   ../../../common/utils/libmcrypt \
                   $(PTHREAD_LIBS) \
+                  $(protobuf_LIBS) \
                   $(openssl_LIBS) $(MATH_LIBS) $(pfor_LIBS)
 }
 
diff --git a/sql/backends/monet5/sql_result.c b/sql/backends/monet5/sql_result.c
--- a/sql/backends/monet5/sql_result.c
+++ b/sql/backends/monet5/sql_result.c
@@ -1867,6 +1867,10 @@ static int write_str_term(stream* s, str
        return  mnstr_writeStr(s, val) && mnstr_writeBte(s, 0);
 }
 
+#ifdef HAVE_LIBPROTOBUF
+#include <mhapi.pb-c.h>
+#endif
+
 static int mvc_export_resultset_prot10(res_table* t, stream* s, stream *c, 
size_t bsize) {
        BAT *order;
        lng count;
@@ -2085,6 +2089,76 @@ static int mvc_export_resultset_prot10(r
 
                assert(bs2_buffer(s).pos == 0);
 
+               if (colcomp == COLUMN_COMPRESSION_PROTOBUF) {
+#ifndef HAVE_LIBPROTOBUF
+                       fprintf(stderr, "Can't use protobuf stuff.\n");
+                       goto cleanup;
+#else
+                       Mhapi__QueryResult msg;
+                       mhapi__query_result__init(&msg);
+                       msg.row_count = (int64_t)(row - srow);
+                       msg.n_columns = t->nr_cols;
+                       msg.columns = 
malloc(sizeof(Mhapi__QueryResult__Column)*t->nr_cols);
+                       if (!msg.columns) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to