Changeset: d5d332081c12 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d5d332081c12
Added Files:
        common/protobuf/Makefile.ag
        common/protobuf/messages.c
        common/protobuf/messages.h
        common/protobuf/mhapi.proto
Modified Files:
        monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:

proto stuff


diffs (truncated from 313 to 300 lines):

diff --git a/common/protobuf/Makefile.ag b/common/protobuf/Makefile.ag
new file mode 100644
--- /dev/null
+++ b/common/protobuf/Makefile.ag
@@ -0,0 +1,25 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V.
+
+## Process this file with automake to produce Makefile.in
+
+MTSAFE
+
+INCLUDES = ../stream \
+               $(snappy_CFLAGS) \
+               $(protobuf_CFLAGS) 
+
+lib_protobuf  =  {
+       SOURCES = messages.c messages.h mhapi.pb-c.c mhapi.pb-c.h
+       LIBS = $(snappy_LIBS) \
+                  $(protobuf_LIBS)
+}
+
+headers_common = {
+       DIR = includedir/monetdb
+       HEADERS = h
+       SOURCES = messages.h mhapi.pb-c.h
+}
diff --git a/common/protobuf/messages.c b/common/protobuf/messages.c
new file mode 100644
--- /dev/null
+++ b/common/protobuf/messages.c
@@ -0,0 +1,175 @@
+#include "messages.h"
+#ifdef HAVE_LIBSNAPPY
+#include <snappy-c.h> // C forever
+#endif
+
+#include <stdarg.h>            /* va_alist.. */
+
+
+Mhapi__Message* message_read(stream *s, compression_method comp) {
+       lng len;
+       mnstr_readLng(s, &len);
+       return message_read_length(s, comp, len);
+}
+
+Mhapi__Message* message_read_length(stream *s, compression_method comp, lng 
len) {
+       Mhapi__Message* ret;
+       char* read_buf;
+
+       if (len < 1) {
+               return NULL;
+       }
+       read_buf = malloc(len);
+       if (!read_buf) {
+               return NULL;
+       }
+       if (mnstr_read(s, read_buf, len, 1) != len) {
+               return NULL;
+       }
+       switch(comp) {
+       case COMPRESSION_NONE:
+               break;
+       case COMPRESSION_SNAPPY:
+#ifdef HAVE_LIBSNAPPY
+       {
+               size_t uncompressed_length;
+               char *uncompressed_buf;
+               if (!snappy_uncompressed_length(read_buf, len, 
&uncompressed_length) == SNAPPY_OK) {
+                       free(read_buf);
+                       return NULL;
+               }
+               uncompressed_buf = malloc(uncompressed_length);
+               if (!uncompressed_buf) {
+                       free(read_buf);
+                       return NULL;
+               }
+               if (snappy_uncompress(read_buf, len, uncompressed_buf, 
&uncompressed_length) != SNAPPY_OK) {
+                       free(read_buf);
+                       free(uncompressed_buf);
+                       return NULL;
+               }
+               free(read_buf);
+               read_buf = uncompressed_buf;
+               len = uncompressed_length;
+       }
+#else
+               return NULL;
+#endif
+       }
+
+       ret = (Mhapi__Message*) 
protobuf_c_message_unpack(&mhapi__message__descriptor, NULL, len, (uint8_t*) 
read_buf);
+       free(read_buf);
+       return ret;
+}
+
+ssize_t message_write(stream *s, compression_method comp, Mhapi__Message *msg) 
{
+       lng len = protobuf_c_message_get_packed_size((ProtobufCMessage*) msg);
+       char* write_buf = malloc(len);
+       if (!write_buf) {
+               return -1;
+       }
+       if (protobuf_c_message_pack((ProtobufCMessage*) msg, (uint8_t*) 
write_buf) != (size_t) len) {
+               return -1;
+       }
+       switch(comp) {
+       case COMPRESSION_NONE:
+               break;
+       case COMPRESSION_SNAPPY:
+#ifdef HAVE_LIBSNAPPY
+       {
+               size_t compressed_length = snappy_max_compressed_length(len);
+               char *compressed_buf = malloc(compressed_length);
+               if (!compressed_buf) {
+                       free(write_buf);
+                       return NULL;
+               }
+               if (snappy_compress(write_buf, len, compressed_buf, 
&compressed_length) != SNAPPY_OK) {
+                       free(write_buf);
+                       free(compressed_buf);
+                       return NULL;
+               }
+               free(write_buf);
+               write_buf = compressed_buf;
+               len = compressed_length;
+       }
+#else
+               return -1;
+#endif
+       }
+
+       if (!mnstr_writeLng(s, len) || mnstr_write(s, write_buf, len, 1) != 
len) {
+               return -1;
+       }
+       return len;
+}
+
+void message_send_error(stream *s, protocol_version proto, const char *format, 
...) {
+       char buf[BUFSIZ], *bf = buf;
+       int i = 0;
+       va_list ap;
+
+       if (s == NULL) {
+               return;
+       }
+       va_start(ap, format);
+       i = vsnprintf(bf, BUFSIZ, format, ap);
+       va_end (ap);
+
+       if (proto == prot9) {
+               mnstr_printf(s, "!%s\n", bf);
+               mnstr_flush(s);
+       } else {
+               Mhapi__Message msg;
+               Mhapi__Error err;
+
+               mhapi__message__init(&msg);
+               mhapi__error__init(&err);
+
+               msg.message_case = MHAPI__MESSAGE__MESSAGE_ERROR;
+               err.message = bf;
+               msg.error = &err;
+
+               if (proto == prot10compressed) {
+                       message_write(s, COMPRESSION_SNAPPY, &msg);
+               } else {
+                       message_write(s, COMPRESSION_NONE, &msg);
+               }
+       }
+}
+
+// fixme this is mostly redundant
+
+void message_send_warning(stream *s, protocol_version proto, const char 
*format, ...) {
+       char buf[BUFSIZ], *bf = buf;
+       int i = 0;
+       va_list ap;
+
+       if (s == NULL) {
+               return;
+       }
+       va_start(ap, format);
+       i = vsnprintf(bf, BUFSIZ, format, ap);
+       va_end (ap);
+
+       if (proto == prot9) {
+               mnstr_printf(s, "#%s\n", bf);
+               mnstr_flush(s);
+       } else {
+               Mhapi__Message msg;
+               Mhapi__Warning err;
+
+               mhapi__message__init(&msg);
+               mhapi__warning__init(&err);
+
+               msg.message_case = MHAPI__MESSAGE__MESSAGE_WARNING;
+               err.message = bf;
+               msg.warning = &err;
+
+               if (proto == prot10compressed) {
+                       message_write(s, COMPRESSION_SNAPPY, &msg);
+               } else {
+                       message_write(s, COMPRESSION_NONE, &msg);
+               }
+       }
+}
+
diff --git a/common/protobuf/messages.h b/common/protobuf/messages.h
new file mode 100644
--- /dev/null
+++ b/common/protobuf/messages.h
@@ -0,0 +1,9 @@
+#include "mhapi.pb-c.h"
+#include "stream.h"
+
+Mhapi__Message* message_read(stream *s, compression_method comp);
+Mhapi__Message* message_read_length(stream *s, compression_method comp, lng 
length);
+
+ssize_t message_write(stream *s, compression_method comp, Mhapi__Message *msg);
+void message_send_error(stream *s, protocol_version proto, const char *format, 
...);
+void message_send_warning(stream *s, protocol_version proto, const char 
*format, ...);
diff --git a/common/protobuf/mhapi.proto b/common/protobuf/mhapi.proto
new file mode 100644
--- /dev/null
+++ b/common/protobuf/mhapi.proto
@@ -0,0 +1,71 @@
+package mhapi;
+
+// this is very preliminary
+
+
+message AuthResponse {
+       enum Version {
+               PROTO10 = 1;
+       }
+       optional Version protocol_version = 1;
+       optional string username = 2;
+       optional string password_hashed = 3;
+       optional string dbname = 4;
+       enum Scenario {
+       MAL = 0;
+               SQL = 1;
+       }
+       optional Scenario scenario = 5;
+       enum Compression {
+               NONE = 0;
+               SNAPPY = 1;
+       }
+       optional Compression compression_method = 6;
+}
+
+message Error {
+       optional string message = 1;
+}
+
+message Warning {
+       optional string message = 1;
+}
+
+message Query {
+       optional string message = 1;
+}
+
+
+message QueryResult {
+       optional int64 rows = 1;
+       message Column {
+               optional string name = 1;
+               optional string sqltype = 2;
+
+               enum Type {
+                       STRING = 1;
+                       INTEGER = 2;
+                       FLOAT = 3;
+                       // more types added here...
+                       // maybe these should be consistent with something else
+               }
+
+               optional Type type = 3;
+               repeated string stringValues = 10;
+               repeated int64 intValues = 11 [packed=true];
+               repeated double floatValues = 12 [packed=true];
+       }
+
+       repeated Column columns = 3;
+}
+
+
+message Message {
+       oneof message {
+               AuthResponse authresp = 2;
+               Query query = 3;
+               Error error = 5;
+               Warning warning = 6;
+               QueryResult queryresult = 7;
+       }
+}
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to