Changeset: 9cb1d66ebc89 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9cb1d66ebc89 Modified Files: clients/R/MonetDB.R/NAMESPACE clients/R/MonetDB.R/R/monetdb.R clients/R/MonetDB.R/src/mapi.c Branch: default Log Message:
R Connector - Switched to external pointers for socket connection ID - Fixed timeouts - Fixed SO/DLL loading bug diffs (236 lines): diff --git a/clients/R/MonetDB.R/NAMESPACE b/clients/R/MonetDB.R/NAMESPACE --- a/clients/R/MonetDB.R/NAMESPACE +++ b/clients/R/MonetDB.R/NAMESPACE @@ -1,4 +1,4 @@ -import(DBI,utils,stats) +import(DBI,utils,stats,digest,methods) # export only driver constructor, everything else is DBI stuff.. export(MonetDB,MonetR,MonetDBR,MonetDB.R) diff --git a/clients/R/MonetDB.R/R/monetdb.R b/clients/R/MonetDB.R/R/monetdb.R --- a/clients/R/MonetDB.R/R/monetdb.R +++ b/clients/R/MonetDB.R/R/monetdb.R @@ -1,6 +1,11 @@ require(DBI) require(digest) +.onLoad <- function(lib, pkg) { + library.dynam( "MonetDB.R", pkg, lib ) + .Call("mapiInit",PACKAGE="MonetDB.R") +} + # TODO: make these values configurable in the call to dbConnect DEBUG_IO <- FALSE DEBUG_QUERY <- FALSE @@ -59,17 +64,17 @@ setMethod("dbConnect", "MonetDBDriver", # blocking = TRUE, open="r+b",timeout = 5 ) # this goes to src/mapi.c - socket <- socket <<- .Call("mapiConnect",host,port,5) + socket <- socket <<- .Call("mapiConnect",host,port,5,PACKAGE="MonetDB.R") # authenticate .monetAuthenticate(socket,dbname,user,password) # test the connection to make sure it works before .mapiWrite(socket,"sSELECT 42;"); .mapiRead(socket) #close(socket) - .Call("mapiDisconnect",socket) + .Call("mapiDisconnect",socket,PACKAGE="MonetDB.R") break }, error = function(e) { if ("connection" %in% class(socket)) { - .Call("mapiDisconnect",socket) + .Call("mapiDisconnect",socket,PACKAGE="MonetDB.R") } cat(paste0("Server not ready(",e$message,"), retrying (ESC or CTRL+C to abort)\n")) Sys.sleep(1) @@ -81,7 +86,7 @@ setMethod("dbConnect", "MonetDBDriver", # make new socket with user-specified timeout #socket <- socket <<- socketConnection(host = host, port = port, # blocking = TRUE, open="r+b",timeout = timeout) - socket <- socket <<- .Call("mapiConnect",host,port,5) + socket <- socket <<- .Call("mapiConnect",host,port,timeout,PACKAGE="MonetDB.R") .monetAuthenticate(socket,dbname,user,password) connenv <- new.env(parent=emptyenv()) connenv$lock <- 0 @@ -94,10 +99,10 @@ setMethod("dbConnect", "MonetDBDriver", ### MonetDBConnection, #monetdb_mapi_conn -setClass("MonetDBConnection", representation("DBIConnection",socket="monetdb_mapi_conn",connenv="environment",fetchSize="integer")) +setClass("MonetDBConnection", representation("DBIConnection",socket="externalptr",connenv="environment",fetchSize="integer")) setMethod("dbDisconnect", "MonetDBConnection", def=function(conn, ...) { - .Call("mapiDisconnect",conn@socket) + .Call("mapiDisconnect",conn@socket,PACKAGE="MonetDB.R") TRUE }) @@ -557,9 +562,9 @@ REPLY_SIZE <- 100 # Apparently, -1 me } .mapiRead <- function(con) { - if (!identical(class(con)[[1]],"monetdb_mapi_conn")) - stop("I can only be called with a monetdb_mapi_conn object as parameter.") - respstr <- .Call("mapiRead",con) + if (!identical(class(con)[[1]],"externalptr")) + stop("I can only be called with a MonetDB connection object as parameter.") + respstr <- .Call("mapiRead",con,PACKAGE="MonetDB.R") if (DEBUG_IO) { dstr <- respstr if (nchar(dstr) > 300) { @@ -571,11 +576,11 @@ REPLY_SIZE <- 100 # Apparently, -1 me } .mapiWrite <- function(con,msg) { - if (!identical(class(con)[[1]],"monetdb_mapi_conn")) - stop("I can only be called with a monetdb_mapi_conn object as parameter.") + if (!identical(class(con)[[1]],"externalptr")) + stop("I can only be called with a MonetDB connection object as parameter.") if (DEBUG_IO) cat(paste("TX: '",msg,"'\n",sep="")) - .Call("mapiWrite",con,msg) + .Call("mapiWrite",con,msg,PACKAGE="MonetDB.R") return (NULL) } diff --git a/clients/R/MonetDB.R/src/mapi.c b/clients/R/MonetDB.R/src/mapi.c --- a/clients/R/MonetDB.R/src/mapi.c +++ b/clients/R/MonetDB.R/src/mapi.c @@ -21,13 +21,34 @@ #define BLOCKSIZE 8190 #define BUFSIZE BLOCKSIZE+1 #define SOCKET int -#define SOCK_ATTR "mapi_conn_do_not_touch" -#define CONN_CLASS "monetdb_mapi_conn" #define TRUE 1 #define FALSE 0 #define ALLOCSIZE 1048576 // 1 MB #define DEBUG FALSE +// reference tricks taken from http://homepage.stat.uiowa.edu/~luke/R/simpleref.html#NWarqU3-KrSQa-1 +static SEXP MAPI_type_tag; + +#define CHECK_MAPI_SOCK(s) do { \ + if (TYPEOF(s) != EXTPTRSXP || \ + R_ExternalPtrTag(s) != MAPI_type_tag) \ + error("bad socket"); \ +} while (0) + +SEXP mapiInit(void) { + MAPI_type_tag = install("MAPI_TYPE_TAG"); + return R_NilValue; +} + +SEXP mapiDisconnect(SEXP conn) { + CHECK_MAPI_SOCK(conn); + SOCKET *sock = R_ExternalPtrAddr(conn); + shutdown(*sock, 2); + R_ClearExternalPtr(conn); + free(sock); + return R_NilValue; +} + SEXP mapiConnect(SEXP host, SEXP port, SEXP timeout) { // be a bit paranoid about the parameters assert(IS_CHARACTER(host)); @@ -45,7 +66,7 @@ SEXP mapiConnect(SEXP host, SEXP port, S assert(portval > 0 && portval < 65535); assert(timeoutval > 0); - SEXP connobj, class, attr; + SEXP connobj; SOCKET sock; struct addrinfo hints; @@ -69,9 +90,9 @@ SEXP mapiConnect(SEXP host, SEXP port, S hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; + // resolve dns name char portvalstr[15]; sprintf(portvalstr, "%d", portval); - int s = getaddrinfo(hostval, portvalstr, &hints, &result); if (s != 0) { error("ERROR, failed to resolve host %s", hostval); @@ -101,7 +122,7 @@ SEXP mapiConnect(SEXP host, SEXP port, S if (DEBUG) { printf("II: Connected to %s:%s\n", hostval, portvalstr); } - break; /* Success */ + break; // Profit } close(sock); } @@ -111,35 +132,24 @@ SEXP mapiConnect(SEXP host, SEXP port, S } freeaddrinfo(result); - if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &sto, sizeof(sto)) - < 0) - error("setsockopt failed"); - if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *) &sto, sizeof(sto)) - < 0) - error("setsockopt failed"); + // get the socket number off the stack so that we can use R external pointers for passing it around + // external pointers are very useful as they can have finalizers, in our case, mapiDisconnect() + void * sockaddr = malloc(sizeof(int)); + if (sockaddr == NULL) { + error("Error in malloc() for a single integer, srsly?"); + } + memcpy(sockaddr, &sock, sizeof(int)); - // construct a r object of class monetdb_mapi_conn with an attribute holding the connection id - PROTECT(connobj = ScalarInteger(1)); - PROTECT(attr = ScalarInteger(1)); - PROTECT(class = allocVector(STRSXP, 1)); - SET_STRING_ELT(class, 0, mkChar(CONN_CLASS)); - classgets(connobj, class); - INTEGER_POINTER(attr)[0] = sock; - setAttrib(connobj, install(SOCK_ATTR), attr); - UNPROTECT(3); + PROTECT(connobj = R_MakeExternalPtr(sockaddr, MAPI_type_tag, R_NilValue)); + R_RegisterCFinalizerEx(connobj, (R_CFinalizer_t) mapiDisconnect, 0); + CHECK_MAPI_SOCK(connobj); + UNPROTECT(1); return connobj; } -SEXP mapiDisconnect(SEXP conn) { - SOCKET sock = INTEGER_POINTER( - AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0]; - shutdown(sock, 2); - return R_NilValue; -} - SEXP mapiRead(SEXP conn) { - SOCKET sock = INTEGER_POINTER( - AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0]; + CHECK_MAPI_SOCK(conn); + SOCKET sock = *((SOCKET*) R_ExternalPtrAddr(conn)); SEXP lines; char read_buf[BUFSIZE]; @@ -157,7 +167,6 @@ SEXP mapiRead(SEXP conn) { while (!block_final) { // read block header and extract block length and final bit from header // this assumes little-endianness (so sue me) - n = recv(sock, (void *) &header, 2, MSG_WAITALL); if (n != 2) { error("ERROR reading MAPI block header (%d)", n); @@ -207,11 +216,12 @@ SEXP mapiRead(SEXP conn) { } SEXP mapiWrite(SEXP conn, SEXP message) { + CHECK_MAPI_SOCK(conn); + SOCKET sock = *((SOCKET*) R_ExternalPtrAddr(conn)); + assert(IS_CHARACTER(message)); assert(GET_LENGTH(message) == 1); - SOCKET sock = INTEGER_POINTER( - AS_INTEGER(getAttrib(conn, install(SOCK_ATTR))))[0]; const char *messageval = CHAR(STRING_ELT(message, 0)); assert(strlen(messageval) > 0); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list