Changeset: 9cb1d66ebc89 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9cb1d66ebc89
Modified Files:
        clients/R/MonetDB.R/NAMESPACE
        clients/R/MonetDB.R/R/monetdb.R
        clients/R/MonetDB.R/src/mapi.c
Branch: default
Log Message:

R Connector
- Switched to external pointers for socket connection ID
- Fixed timeouts
- Fixed SO/DLL loading bug


diffs (236 lines):

diff --git a/clients/R/MonetDB.R/NAMESPACE b/clients/R/MonetDB.R/NAMESPACE
--- a/clients/R/MonetDB.R/NAMESPACE
+++ b/clients/R/MonetDB.R/NAMESPACE
@@ -1,4 +1,4 @@
-import(DBI,utils,stats)
+import(DBI,utils,stats,digest,methods)
 
 # export only driver constructor, everything else is DBI stuff..
 export(MonetDB,MonetR,MonetDBR,MonetDB.R)
diff --git a/clients/R/MonetDB.R/R/monetdb.R b/clients/R/MonetDB.R/R/monetdb.R
--- a/clients/R/MonetDB.R/R/monetdb.R
+++ b/clients/R/MonetDB.R/R/monetdb.R
@@ -1,6 +1,11 @@
 require(DBI)
 require(digest)
 
+.onLoad <- function(lib, pkg) {
+       library.dynam( "MonetDB.R", pkg, lib )
+       .Call("mapiInit",PACKAGE="MonetDB.R")
+}
+
 # TODO: make these values configurable in the call to dbConnect
 DEBUG_IO      <- FALSE
 DEBUG_QUERY   <- FALSE
@@ -59,17 +64,17 @@ setMethod("dbConnect", "MonetDBDriver", 
                                                                #       
blocking = TRUE, open="r+b",timeout = 5 )
                                                                
                                                                # this goes to 
src/mapi.c
-                                                               socket <- 
socket <<- .Call("mapiConnect",host,port,5)
+                                                               socket <- 
socket <<- .Call("mapiConnect",host,port,5,PACKAGE="MonetDB.R")
                                                                # authenticate
                                                                
.monetAuthenticate(socket,dbname,user,password)
                                                                # test the 
connection to make sure it works before
                                                                
.mapiWrite(socket,"sSELECT 42;"); .mapiRead(socket)
                                                                #close(socket)
-                                                               
.Call("mapiDisconnect",socket)
+                                                               
.Call("mapiDisconnect",socket,PACKAGE="MonetDB.R")
                                                                break
                                                        }, error = function(e) {
                                                                if 
("connection" %in% class(socket)) {
-                                                                       
.Call("mapiDisconnect",socket)
+                                                                       
.Call("mapiDisconnect",socket,PACKAGE="MonetDB.R")
                                                                }
                                                                
cat(paste0("Server not ready(",e$message,"), retrying (ESC or CTRL+C to 
abort)\n"))
                                                                Sys.sleep(1)
@@ -81,7 +86,7 @@ setMethod("dbConnect", "MonetDBDriver", 
                        # make new socket with user-specified timeout
                        #socket <- socket <<- socketConnection(host = host, 
port = port, 
                        #       blocking = TRUE, open="r+b",timeout = timeout) 
-                       socket <- socket <<- .Call("mapiConnect",host,port,5)
+                       socket <- socket <<- 
.Call("mapiConnect",host,port,timeout,PACKAGE="MonetDB.R")
                        .monetAuthenticate(socket,dbname,user,password)
                        connenv <- new.env(parent=emptyenv())
                        connenv$lock <- 0
@@ -94,10 +99,10 @@ setMethod("dbConnect", "MonetDBDriver", 
 
 
 ### MonetDBConnection, #monetdb_mapi_conn
-setClass("MonetDBConnection", 
representation("DBIConnection",socket="monetdb_mapi_conn",connenv="environment",fetchSize="integer"))
+setClass("MonetDBConnection", 
representation("DBIConnection",socket="externalptr",connenv="environment",fetchSize="integer"))
 
 setMethod("dbDisconnect", "MonetDBConnection", def=function(conn, ...) {
-                       .Call("mapiDisconnect",conn@socket)
+                       .Call("mapiDisconnect",conn@socket,PACKAGE="MonetDB.R")
                        TRUE
                })
 
@@ -557,9 +562,9 @@ REPLY_SIZE    <- 100 # Apparently, -1 me
 }
 
 .mapiRead <- function(con) {
-       if (!identical(class(con)[[1]],"monetdb_mapi_conn"))
-               stop("I can only be called with a monetdb_mapi_conn object as 
parameter.")
-       respstr <- .Call("mapiRead",con)
+       if (!identical(class(con)[[1]],"externalptr"))
+               stop("I can only be called with a MonetDB connection object as 
parameter.")
+       respstr <- .Call("mapiRead",con,PACKAGE="MonetDB.R")
        if (DEBUG_IO) {
                dstr <- respstr
                if (nchar(dstr) > 300) {
@@ -571,11 +576,11 @@ REPLY_SIZE    <- 100 # Apparently, -1 me
 }
 
 .mapiWrite <- function(con,msg) {
-       if (!identical(class(con)[[1]],"monetdb_mapi_conn"))
-               stop("I can only be called with a monetdb_mapi_conn object as 
parameter.")
+       if (!identical(class(con)[[1]],"externalptr"))
+               stop("I can only be called with a MonetDB connection object as 
parameter.")
        
        if (DEBUG_IO)  cat(paste("TX: '",msg,"'\n",sep=""))     
-       .Call("mapiWrite",con,msg)
+       .Call("mapiWrite",con,msg,PACKAGE="MonetDB.R")
        return (NULL)
 }
 
diff --git a/clients/R/MonetDB.R/src/mapi.c b/clients/R/MonetDB.R/src/mapi.c
--- a/clients/R/MonetDB.R/src/mapi.c
+++ b/clients/R/MonetDB.R/src/mapi.c
@@ -21,13 +21,34 @@
 #define BLOCKSIZE 8190
 #define BUFSIZE BLOCKSIZE+1
 #define SOCKET int
-#define SOCK_ATTR "mapi_conn_do_not_touch"
-#define CONN_CLASS "monetdb_mapi_conn"
 #define TRUE 1
 #define FALSE 0
 #define ALLOCSIZE 1048576 // 1 MB
 #define DEBUG FALSE
 
+// reference tricks taken from 
http://homepage.stat.uiowa.edu/~luke/R/simpleref.html#NWarqU3-KrSQa-1
+static SEXP MAPI_type_tag;
+
+#define CHECK_MAPI_SOCK(s) do { \
+    if (TYPEOF(s) != EXTPTRSXP || \
+        R_ExternalPtrTag(s) != MAPI_type_tag) \
+        error("bad socket"); \
+} while (0)
+
+SEXP mapiInit(void) {
+       MAPI_type_tag = install("MAPI_TYPE_TAG");
+       return R_NilValue;
+}
+
+SEXP mapiDisconnect(SEXP conn) {
+       CHECK_MAPI_SOCK(conn);
+       SOCKET *sock = R_ExternalPtrAddr(conn);
+       shutdown(*sock, 2);
+       R_ClearExternalPtr(conn);
+       free(sock);
+       return R_NilValue;
+}
+
 SEXP mapiConnect(SEXP host, SEXP port, SEXP timeout) {
        // be a bit paranoid about the parameters
        assert(IS_CHARACTER(host));
@@ -45,7 +66,7 @@ SEXP mapiConnect(SEXP host, SEXP port, S
        assert(portval > 0 && portval < 65535);
        assert(timeoutval > 0);
 
-       SEXP connobj, class, attr;
+       SEXP connobj;
        SOCKET sock;
 
        struct addrinfo hints;
@@ -69,9 +90,9 @@ SEXP mapiConnect(SEXP host, SEXP port, S
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_protocol = IPPROTO_TCP;
 
+       // resolve dns name
        char portvalstr[15];
        sprintf(portvalstr, "%d", portval);
-
        int s = getaddrinfo(hostval, portvalstr, &hints, &result);
        if (s != 0) {
                error("ERROR, failed to resolve host %s", hostval);
@@ -101,7 +122,7 @@ SEXP mapiConnect(SEXP host, SEXP port, S
                        if (DEBUG) {
                                printf("II: Connected to %s:%s\n", hostval, 
portvalstr);
                        }
-                       break; /* Success */
+                       break; // Profit
                }
                close(sock);
        }
@@ -111,35 +132,24 @@ SEXP mapiConnect(SEXP host, SEXP port, S
        }
        freeaddrinfo(result);
 
-       if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &sto, 
sizeof(sto))
-                       < 0)
-               error("setsockopt failed");
-       if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &sto, 
sizeof(sto))
-                       < 0)
-               error("setsockopt failed");
+       // get the socket number off the stack so that we can use R external 
pointers for passing it around
+       // external pointers are very useful as they can have finalizers, in 
our case, mapiDisconnect()
+       void * sockaddr = malloc(sizeof(int));
+       if (sockaddr == NULL) {
+               error("Error in malloc() for a single integer, srsly?");
+       }
+       memcpy(sockaddr, &sock, sizeof(int));
 
-       // construct a r object of class monetdb_mapi_conn with an attribute 
holding the connection id
-       PROTECT(connobj = ScalarInteger(1));
-       PROTECT(attr = ScalarInteger(1));
-       PROTECT(class = allocVector(STRSXP, 1));
-       SET_STRING_ELT(class, 0, mkChar(CONN_CLASS));
-       classgets(connobj, class);
-       INTEGER_POINTER(attr)[0] = sock;
-       setAttrib(connobj, install(SOCK_ATTR), attr);
-       UNPROTECT(3);
+       PROTECT(connobj = R_MakeExternalPtr(sockaddr, MAPI_type_tag, 
R_NilValue));
+       R_RegisterCFinalizerEx(connobj, (R_CFinalizer_t) mapiDisconnect, 0);
+       CHECK_MAPI_SOCK(connobj);
+       UNPROTECT(1);
        return connobj;
 }
 
-SEXP mapiDisconnect(SEXP conn) {
-       SOCKET sock = INTEGER_POINTER(
-                       AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0];
-       shutdown(sock, 2);
-       return R_NilValue;
-}
-
 SEXP mapiRead(SEXP conn) {
-       SOCKET sock = INTEGER_POINTER(
-                       AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0];
+       CHECK_MAPI_SOCK(conn);
+       SOCKET sock = *((SOCKET*) R_ExternalPtrAddr(conn));
 
        SEXP lines;
        char read_buf[BUFSIZE];
@@ -157,7 +167,6 @@ SEXP mapiRead(SEXP conn) {
        while (!block_final) {
                //  read block header and extract block length and final bit 
from header
                // this assumes little-endianness (so sue me)
-
                n = recv(sock, (void *) &header, 2, MSG_WAITALL);
                if (n != 2) {
                        error("ERROR reading MAPI block header (%d)", n);
@@ -207,11 +216,12 @@ SEXP mapiRead(SEXP conn) {
 }
 
 SEXP mapiWrite(SEXP conn, SEXP message) {
+       CHECK_MAPI_SOCK(conn);
+       SOCKET sock = *((SOCKET*) R_ExternalPtrAddr(conn));
+
        assert(IS_CHARACTER(message));
        assert(GET_LENGTH(message) == 1);
 
-       SOCKET sock = INTEGER_POINTER(
-                       AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0];
        const char *messageval = CHAR(STRING_ELT(message, 0));
        assert(strlen(messageval) > 0);
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to