Patch for cleaned-up and documented async connect support. I'll turn
that into a merge request once I've figured out how to do that.

diff --git a/Pg.h b/Pg.h
index 250ffab..27b9bce 100644
--- a/Pg.h
+++ b/Pg.h
@@ -121,6 +121,8 @@ DBISTATE_DECLARE;
 #define TRACE_PQCMDSTATUS          TRACE_XX "%sPQcmdStatus\n",           
THEADER_slow)
 #define TRACE_PQCMDTUPLES          TRACE_XX "%sPQcmdTuples\n",           
THEADER_slow)
 #define TRACE_PQCONNECTDB          TRACE_XX "%sPQconnectdb\n",           
THEADER_slow)
+#define TRACE_PQCONNECTSTART       TRACE_XX "%sPQconnectStart\n",       
THEADER_slow)
+#define TRACE_PQCONNECTPOLL        TRACE_XX "%sPQconnectPoll\n",        
THEADER_slow)
 #define TRACE_PQCONSUMEINPUT       TRACE_XX "%sPQconsumeInput\n",        
THEADER_slow)
 #define TRACE_PQDB                 TRACE_XX "%sPQdb\n",                  
THEADER_slow)
 #define TRACE_PQENDCOPY            TRACE_XX "%sPQendcopy\n",             
THEADER_slow)
diff --git a/Pg.pm b/Pg.pm
index da9d077..41e8c78 100644
--- a/Pg.pm
+++ b/Pg.pm
@@ -41,7 +41,7 @@ use 5.008001;
 
     our %EXPORT_TAGS =
         (
-         async => [qw($DBDPG_DEFAULT PG_ASYNC PG_OLDQUERY_CANCEL 
PG_OLDQUERY_WAIT)],
+         async => [qw($DBDPG_DEFAULT PG_ASYNC PG_OLDQUERY_CANCEL 
PG_OLDQUERY_WAIT PG_ASYNC_CONN_READ PG_ASYNC_CONN_WRITE)],
          pg_limits => [qw($DBDPG_DEFAULT
                        PG_MIN_SMALLINT PG_MAX_SMALLINT PG_MIN_INTEGER 
PG_MAX_INTEGER PG_MAX_BIGINT PG_MIN_BIGINT
                        PG_MIN_SMALLSERIAL PG_MAX_SMALLSERIAL PG_MIN_SERIAL 
PG_MAX_SERIAL PG_MIN_BIGSERIAL PG_MAX_BIGSERIAL)],
@@ -151,6 +151,7 @@ use 5.008001;
         # uncoverable branch false
         if (!$methods_are_installed) {
             DBD::Pg::db->install_method('pg_cancel');
+            DBD::Pg::db->install_method('pg_continue_connect');
             DBD::Pg::db->install_method('pg_endcopy');
             DBD::Pg::db->install_method('pg_error_field');
             DBD::Pg::db->install_method('pg_getline');
@@ -3223,6 +3224,25 @@ the L</fetchrow_hashref> method.
 Creates a copy of the database handle by connecting with the same parameters 
as the original 
 handle, then trying to merge the attributes. See the DBI documentation for 
complete usage.
 
+=head3 B<pg_continue_connect>
+
+  $rc = $dbh->pg_continue_connect();
+
+Continues an asychronous connect operation. See B<Asynchronous
+Connect> below. After an asychronous connect was initiated, this
+method must be called in a loop for as long as it returns either 1 or
+2, indicating a desire to read or write data,
+respectively. Afterwards, the next call to pg_continue_connect must
+not take place until an indication that data can either be
+read or written on the current pg_socket was obtained, eg, via
+select.
+
+The method returns -1 if no asynchronous connect was in progress, -2 to
+indicate that an asynchronous connect failed and 0 if the connection
+was successfully established.
+
+The socket may have changed after each call to the method.
+
 =head2 Database Handle Attributes
 
 =head3 B<AutoCommit> (boolean)
@@ -4290,6 +4310,16 @@ as you don't need it anymore.
     $count = $sth2->fetchall_arrayref()->[0][0];
   }
 
+=head3 Asynchronous Connect
+
+Passing the attribute pg_async_connect to the DBI connect method, eg,
+
+  $dbh = DBI->connect('dbi:Pg:...', $username, $password,
+                      { pg_async_connect => 1 });
+
+starts an asynchronous connect. The B<pg_continue_connect> method must
+be used afterwards to complete the connection establisment process.
+
 =head2 Array support
 
 DBD::Pg allows arrays (as arrayrefs) to be passed in to both 
diff --git a/Pg.xs b/Pg.xs
index db0c10e..2769f9b 100644
--- a/Pg.xs
+++ b/Pg.xs
@@ -223,6 +223,9 @@ constant(name=Nullch)
     PG_OLDQUERY_CANCEL                = 2
     PG_OLDQUERY_WAIT                  = 4
 
+    PG_ASYNC_CONN_READ    = 1
+    PG_ASYNC_CONN_WRITE   = 2
+
     CODE:
         if (0==ix) {
             if (!name) {
@@ -847,6 +850,14 @@ _pg_type_info (type_sv=Nullsv)
         ST(0) = sv_2mortal( newSViv( type_num ) );
     }
 
+int
+pg_continue_connect(dbh)
+               SV* dbh
+       CODE:
+               RETVAL = pg_db_continue_connect(dbh);
+       OUTPUT:
+               RETVAL
+
 void
 pg_result(dbh)
     SV * dbh
diff --git a/dbdimp.c b/dbdimp.c
index ae5186d..c543226 100644
--- a/dbdimp.c
+++ b/dbdimp.c
@@ -68,6 +68,14 @@ typedef enum
     (SvROK(h) && SvTYPE(SvRV(h)) == SVt_PVHV &&                    \
      SvRMAGICAL(SvRV(h)) && (SvMAGIC(SvRV(h)))->mg_type == 'P')
 
+enum {
+       DBH_ASYNC_CANCELLED =   -1,
+       DBH_NO_ASYNC,
+       DBH_ASYNC,
+       DBH_ASYNC_CONNECT,
+        DBH_ASYNC_CONNECT_POLL
+};
+
 static void pg_error(pTHX_ SV *h, int error_num, const char *error_msg);
 static void pg_warn (void * arg, const char * message);
 static ExecStatusType _result(pTHX_ imp_dbh_t *imp_dbh, const char *sql);
@@ -92,9 +100,56 @@ void dbd_init (dbistate_t *dbistate)
 
 
 /* ================================================================== */
-int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, 
char * pwd, SV *attr)
+static int want_async_connect(pTHX_ SV *attrs)
 {
+       HV *hv;
+       SV **psv, *sv;
+
+       return
+               attrs
+               && (psv = hv_fetchs((HV *)SvRV(attrs), "pg_async_connect", 0))
+               && (sv = *psv)
+               && SvTRUE(sv);
+}
+
+static void after_connect_init(pTHX_ SV *dbh, imp_dbh_t * imp_dbh)
+{
+    /* Figure out what protocol this server is using (most likely 3) */
+    TRACE_PQPROTOCOLVERSION;
+    imp_dbh->pg_protocol = PQprotocolVersion(imp_dbh->conn);
+    if (TLOGIN_slow) TRC(DBILOGFP, "%sprotocol version %d\n", THEADER_slow, 
imp_dbh->pg_protocol);
+
+    /* Figure out this particular backend's version */
+    TRACE_PQSERVERVERSION;
+    imp_dbh->pg_server_version = PQserverVersion(imp_dbh->conn);
+    if (TLOGIN_slow) TRC(DBILOGFP, "%sserver version %d\n", THEADER_slow, 
imp_dbh->pg_server_version);
+
+    if (imp_dbh->pg_server_version < 80000) {
+        if (NULL != strstr(PQparameterStatus(imp_dbh->conn, "server_version"), 
"bouncer")) {
+            imp_dbh->pg_server_version = 90600;
+        }
+        else {
+            TRACE_PQERRORMESSAGE;
+            strncpy(imp_dbh->sqlstate, "08001", 6); /* 
sqlclient_unable_to_establish_sqlconnection */
+            pg_error(aTHX_ dbh, CONNECTION_BAD, "Server version 8.0 required");
+            TRACE_PQFINISH;
+            PQfinish(imp_dbh->conn);
+            sv_free((SV *)imp_dbh->savepoints);
+            if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n", 
THEADER_slow);
+            return;
+        }
+    }
 
+    pg_db_detect_client_encoding_utf8(aTHX_ imp_dbh);
+    /* If the client_encoding is UTF8, flip the utf8 flag until convinced 
otherwise */
+    imp_dbh->pg_utf8_flag = imp_dbh->client_encoding_utf8;
+
+    /* Tell DBI that we should call disconnect when the handle dies */
+    DBIc_ACTIVE_on(imp_dbh);
+}
+
+int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, 
char * pwd, SV *attr)
+{
     dTHR;
     dTHX;
     char *         conn_str;
@@ -102,8 +157,15 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * 
dbname, char * uid, cha
     bool           inquote = DBDPG_FALSE;
     STRLEN         connect_string_size;
     ConnStatusType connstatus;
+    int            async_connect;
+
+    async_connect = want_async_connect(aTHX_ attr);
 
-    if (TSTART_slow) TRC(DBILOGFP, "%sBegin dbd_db_login\n", THEADER_slow);
+    if (TSTART_slow) {
+        TRC(DBILOGFP, "%sBegin dbd_db_login6\n", THEADER_slow);
+        if (async_connect)
+            TRC(DBILOGFP, "%sAsync connect requested\n", THEADER_slow);
+    }
 
     /* DBD::Pg syntax: 'dbname=dbname;host=host;port=port', 'User', 'Pass' */
     /* libpq syntax: 'dbname=dbname host=host port=port user=uid password=pwd' 
*/
@@ -172,12 +234,18 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * 
dbname, char * uid, cha
         TRACE_PQFINISH;
         PQfinish(imp_dbh->conn);
     }
-    
+
     /* Attempt the connection to the database */
     if (TLOGIN_slow) TRC(DBILOGFP, "%sLogin connection string: (%s)\n", 
THEADER_slow, conn_str);
-    TRACE_PQCONNECTDB;
-    imp_dbh->conn = PQconnectdb(conn_str);
-    if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection complete\n", THEADER_slow);
+    if (async_connect) {
+        TRACE_PQCONNECTSTART;
+        imp_dbh->conn = PQconnectStart(conn_str);
+        if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection started\n", THEADER_slow);
+    } else {
+        TRACE_PQCONNECTDB;
+        imp_dbh->conn = PQconnectdb(conn_str);
+        if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection complete\n", 
THEADER_slow);
+    }
     Safefree(conn_str);
 
     /* Set the initial sqlstate */
@@ -187,7 +255,8 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * 
dbname, char * uid, cha
     /* Check to see that the backend connection was successfully made */
     TRACE_PQSTATUS;
     connstatus = PQstatus(imp_dbh->conn);
-    if (CONNECTION_OK != connstatus) {
+    switch (connstatus) {
+    case CONNECTION_BAD:
         TRACE_PQERRORMESSAGE;
         strncpy(imp_dbh->sqlstate, "08006", 6); /* "CONNECTION FAILURE" */
         pg_error(aTHX_ dbh, connstatus, PQerrorMessage(imp_dbh->conn));
@@ -196,46 +265,15 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * 
dbname, char * uid, cha
         sv_free((SV *)imp_dbh->savepoints);
         if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n", 
THEADER_slow);
         return 0;
+
+    case CONNECTION_OK:
+        async_connect = 0;
     }
 
     /* Call the pg_warn function anytime this connection raises a notice */
     TRACE_PQSETNOTICEPROCESSOR;
     (void)PQsetNoticeProcessor(imp_dbh->conn, pg_warn, (void *)SvRV(dbh));
     
-    /* Figure out what protocol this server is using (most likely 3) */
-    TRACE_PQPROTOCOLVERSION;
-    imp_dbh->pg_protocol = PQprotocolVersion(imp_dbh->conn);
-
-    /* Figure out this particular backend's version */
-    TRACE_PQSERVERVERSION;
-    imp_dbh->pg_server_version = PQserverVersion(imp_dbh->conn);
-
-    if (imp_dbh->pg_server_version < 80000) {
-        /* 
-           Special workaround for PgBouncer, which has the unfortunate habit 
of modifying 'server_version', 
-           something it should never do. If we think this is the case for the 
version failure, we 
-           simply allow things to continue with a faked version. See github 
issue #47
-        */
-        if (NULL != strstr(PQparameterStatus(imp_dbh->conn, "server_version"), 
"bouncer")) {
-           imp_dbh->pg_server_version = 90600;
-        }
-        else {
-            TRACE_PQERRORMESSAGE;
-            strncpy(imp_dbh->sqlstate, "08001", 6); /* 
sqlclient_unable_to_establish_sqlconnection */
-            pg_error(aTHX_ dbh, CONNECTION_BAD, "Server version 8.0 required");
-            TRACE_PQFINISH;
-            PQfinish(imp_dbh->conn);
-            sv_free((SV *)imp_dbh->savepoints);
-            if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n", 
THEADER_slow);
-            return 0;
-        }
-    }
-
-    pg_db_detect_client_encoding_utf8(aTHX_ imp_dbh);
-
-    /* If the client_encoding is UTF8, flip the utf8 flag until convinced 
otherwise */
-    imp_dbh->pg_utf8_flag = imp_dbh->client_encoding_utf8;
-
     imp_dbh->pg_enable_utf8  = -1;
 
     imp_dbh->prepare_now       = DBDPG_FALSE;
@@ -252,18 +290,18 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * 
dbname, char * uid, cha
     imp_dbh->copystate         = 0;
     imp_dbh->copybinary        = DBDPG_FALSE;
     imp_dbh->pg_errorlevel     = 1; /* Default */
-    imp_dbh->async_status      = 0;
+    imp_dbh->async_status      = DBH_NO_ASYNC;
     imp_dbh->async_sth         = NULL;
     imp_dbh->last_result       = NULL; /* NULL or the last PGresult returned 
by a database or statement handle */
     imp_dbh->result_clearable  = DBDPG_TRUE;
     imp_dbh->pg_int8_as_string = DBDPG_FALSE;
     imp_dbh->skip_deallocate   = DBDPG_FALSE;
 
-    /* Tell DBI that we should call destroy when the handle dies */
-    DBIc_IMPSET_on(imp_dbh);
-
-    /* Tell DBI that we should call disconnect when the handle dies */
-    DBIc_ACTIVE_on(imp_dbh);
+    /* if not connecting asynchronously, do after connect init */
+    imp_dbh->pg_protocol = -1;
+    imp_dbh->pg_server_version = -1;
+    if (async_connect) imp_dbh->async_status = DBH_ASYNC_CONNECT;
+    else after_connect_init(aTHX_ dbh, imp_dbh);
 
     if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login\n", THEADER_slow);
 
@@ -271,6 +309,62 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * 
dbname, char * uid, cha
 
 } /* end of dbd_db_login */
 
+int pg_db_continue_connect(SV *dbh)
+{
+    dTHX;
+    D_imp_dbh(dbh);
+    int status;
+
+    if (TSTART_slow)
+        TRC(DBILOGFP, "%sBegin pg_db_continue_connect\n", THEADER_slow);
+
+    switch (imp_dbh->async_status) {
+    default:
+        pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, "No async connect in 
progress\n");
+        status = -1;
+        break;
+
+    case DBH_ASYNC_CONNECT:
+        imp_dbh->async_status = DBH_ASYNC_CONNECT_POLL;
+        status = PGRES_POLLING_WRITING;
+        break;
+
+    case DBH_ASYNC_CONNECT_POLL:
+        TRACE_PQCONNECTPOLL;
+        status = PQconnectPoll(imp_dbh->conn);
+        if (TRACE5_slow) TRC(DBILOGFP, "%sPQconnectPoll returned %d\n", 
THEADER_slow, status);
+
+        switch (status) {
+        case PGRES_POLLING_READING:
+        case PGRES_POLLING_WRITING:
+            break;
+
+        case PGRES_POLLING_OK:
+            if (TLOGIN_slow) TRC(DBILOGFP, "%sconnection established\n", 
THEADER_slow);
+
+            imp_dbh->async_status = DBH_NO_ASYNC;
+            after_connect_init(aTHX_ dbh, imp_dbh);
+
+            status = 0;
+            break;
+
+        case PGRES_POLLING_FAILED:
+            TRACE_PQERRORMESSAGE;
+            strncpy(imp_dbh->sqlstate, "08006", 6); /* "CONNECTION FAILURE" */
+            pg_error(aTHX_ dbh, PQstatus(imp_dbh->conn), 
PQerrorMessage(imp_dbh->conn));
+            TRACE_PQFINISH;
+            PQfinish(imp_dbh->conn);
+            imp_dbh->conn = NULL;
+
+            imp_dbh->async_status = DBH_NO_ASYNC;
+
+            status = -2;
+        }
+    }
+
+    if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_continue_connect\n", 
THEADER_slow);
+    return status;
+}
 
 /* ================================================================== */
 /* 
diff --git a/dbdimp.h b/dbdimp.h
index de74bde..9dc4d08 100644
--- a/dbdimp.h
+++ b/dbdimp.h
@@ -144,6 +144,9 @@ extern void dbd_init (dbistate_t *dbistate);
 #define dbd_db_login6 pg_db_login6
 int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, 
char * pwd, SV *attr);
 
+#define dbd_db_continue_connect pg_db_continue_connect
+int dbd_db_continue_connect(SV *h);
+
 #define dbd_db_ping  pg_db_ping
 int dbd_db_ping(SV *dbh);
 

Reply via email to