Patch for cleaned-up and documented async connect support. I'll turn
that into a merge request once I've figured out how to do that.
diff --git a/Pg.h b/Pg.h
index 250ffab..27b9bce 100644
--- a/Pg.h
+++ b/Pg.h
@@ -121,6 +121,8 @@ DBISTATE_DECLARE;
#define TRACE_PQCMDSTATUS TRACE_XX "%sPQcmdStatus\n",
THEADER_slow)
#define TRACE_PQCMDTUPLES TRACE_XX "%sPQcmdTuples\n",
THEADER_slow)
#define TRACE_PQCONNECTDB TRACE_XX "%sPQconnectdb\n",
THEADER_slow)
+#define TRACE_PQCONNECTSTART TRACE_XX "%sPQconnectStart\n",
THEADER_slow)
+#define TRACE_PQCONNECTPOLL TRACE_XX "%sPQconnectPoll\n",
THEADER_slow)
#define TRACE_PQCONSUMEINPUT TRACE_XX "%sPQconsumeInput\n",
THEADER_slow)
#define TRACE_PQDB TRACE_XX "%sPQdb\n",
THEADER_slow)
#define TRACE_PQENDCOPY TRACE_XX "%sPQendcopy\n",
THEADER_slow)
diff --git a/Pg.pm b/Pg.pm
index da9d077..41e8c78 100644
--- a/Pg.pm
+++ b/Pg.pm
@@ -41,7 +41,7 @@ use 5.008001;
our %EXPORT_TAGS =
(
- async => [qw($DBDPG_DEFAULT PG_ASYNC PG_OLDQUERY_CANCEL
PG_OLDQUERY_WAIT)],
+ async => [qw($DBDPG_DEFAULT PG_ASYNC PG_OLDQUERY_CANCEL
PG_OLDQUERY_WAIT PG_ASYNC_CONN_READ PG_ASYNC_CONN_WRITE)],
pg_limits => [qw($DBDPG_DEFAULT
PG_MIN_SMALLINT PG_MAX_SMALLINT PG_MIN_INTEGER
PG_MAX_INTEGER PG_MAX_BIGINT PG_MIN_BIGINT
PG_MIN_SMALLSERIAL PG_MAX_SMALLSERIAL PG_MIN_SERIAL
PG_MAX_SERIAL PG_MIN_BIGSERIAL PG_MAX_BIGSERIAL)],
@@ -151,6 +151,7 @@ use 5.008001;
# uncoverable branch false
if (!$methods_are_installed) {
DBD::Pg::db->install_method('pg_cancel');
+ DBD::Pg::db->install_method('pg_continue_connect');
DBD::Pg::db->install_method('pg_endcopy');
DBD::Pg::db->install_method('pg_error_field');
DBD::Pg::db->install_method('pg_getline');
@@ -3223,6 +3224,25 @@ the L</fetchrow_hashref> method.
Creates a copy of the database handle by connecting with the same parameters
as the original
handle, then trying to merge the attributes. See the DBI documentation for
complete usage.
+=head3 B<pg_continue_connect>
+
+ $rc = $dbh->pg_continue_connect();
+
+Continues an asychronous connect operation. See B<Asynchronous
+Connect> below. After an asychronous connect was initiated, this
+method must be called in a loop for as long as it returns either 1 or
+2, indicating a desire to read or write data,
+respectively. Afterwards, the next call to pg_continue_connect must
+not take place until an indication that data can either be
+read or written on the current pg_socket was obtained, eg, via
+select.
+
+The method returns -1 if no asynchronous connect was in progress, -2 to
+indicate that an asynchronous connect failed and 0 if the connection
+was successfully established.
+
+The socket may have changed after each call to the method.
+
=head2 Database Handle Attributes
=head3 B<AutoCommit> (boolean)
@@ -4290,6 +4310,16 @@ as you don't need it anymore.
$count = $sth2->fetchall_arrayref()->[0][0];
}
+=head3 Asynchronous Connect
+
+Passing the attribute pg_async_connect to the DBI connect method, eg,
+
+ $dbh = DBI->connect('dbi:Pg:...', $username, $password,
+ { pg_async_connect => 1 });
+
+starts an asynchronous connect. The B<pg_continue_connect> method must
+be used afterwards to complete the connection establisment process.
+
=head2 Array support
DBD::Pg allows arrays (as arrayrefs) to be passed in to both
diff --git a/Pg.xs b/Pg.xs
index db0c10e..2769f9b 100644
--- a/Pg.xs
+++ b/Pg.xs
@@ -223,6 +223,9 @@ constant(name=Nullch)
PG_OLDQUERY_CANCEL = 2
PG_OLDQUERY_WAIT = 4
+ PG_ASYNC_CONN_READ = 1
+ PG_ASYNC_CONN_WRITE = 2
+
CODE:
if (0==ix) {
if (!name) {
@@ -847,6 +850,14 @@ _pg_type_info (type_sv=Nullsv)
ST(0) = sv_2mortal( newSViv( type_num ) );
}
+int
+pg_continue_connect(dbh)
+ SV* dbh
+ CODE:
+ RETVAL = pg_db_continue_connect(dbh);
+ OUTPUT:
+ RETVAL
+
void
pg_result(dbh)
SV * dbh
diff --git a/dbdimp.c b/dbdimp.c
index ae5186d..c543226 100644
--- a/dbdimp.c
+++ b/dbdimp.c
@@ -68,6 +68,14 @@ typedef enum
(SvROK(h) && SvTYPE(SvRV(h)) == SVt_PVHV && \
SvRMAGICAL(SvRV(h)) && (SvMAGIC(SvRV(h)))->mg_type == 'P')
+enum {
+ DBH_ASYNC_CANCELLED = -1,
+ DBH_NO_ASYNC,
+ DBH_ASYNC,
+ DBH_ASYNC_CONNECT,
+ DBH_ASYNC_CONNECT_POLL
+};
+
static void pg_error(pTHX_ SV *h, int error_num, const char *error_msg);
static void pg_warn (void * arg, const char * message);
static ExecStatusType _result(pTHX_ imp_dbh_t *imp_dbh, const char *sql);
@@ -92,9 +100,56 @@ void dbd_init (dbistate_t *dbistate)
/* ================================================================== */
-int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid,
char * pwd, SV *attr)
+static int want_async_connect(pTHX_ SV *attrs)
{
+ HV *hv;
+ SV **psv, *sv;
+
+ return
+ attrs
+ && (psv = hv_fetchs((HV *)SvRV(attrs), "pg_async_connect", 0))
+ && (sv = *psv)
+ && SvTRUE(sv);
+}
+
+static void after_connect_init(pTHX_ SV *dbh, imp_dbh_t * imp_dbh)
+{
+ /* Figure out what protocol this server is using (most likely 3) */
+ TRACE_PQPROTOCOLVERSION;
+ imp_dbh->pg_protocol = PQprotocolVersion(imp_dbh->conn);
+ if (TLOGIN_slow) TRC(DBILOGFP, "%sprotocol version %d\n", THEADER_slow,
imp_dbh->pg_protocol);
+
+ /* Figure out this particular backend's version */
+ TRACE_PQSERVERVERSION;
+ imp_dbh->pg_server_version = PQserverVersion(imp_dbh->conn);
+ if (TLOGIN_slow) TRC(DBILOGFP, "%sserver version %d\n", THEADER_slow,
imp_dbh->pg_server_version);
+
+ if (imp_dbh->pg_server_version < 80000) {
+ if (NULL != strstr(PQparameterStatus(imp_dbh->conn, "server_version"),
"bouncer")) {
+ imp_dbh->pg_server_version = 90600;
+ }
+ else {
+ TRACE_PQERRORMESSAGE;
+ strncpy(imp_dbh->sqlstate, "08001", 6); /*
sqlclient_unable_to_establish_sqlconnection */
+ pg_error(aTHX_ dbh, CONNECTION_BAD, "Server version 8.0 required");
+ TRACE_PQFINISH;
+ PQfinish(imp_dbh->conn);
+ sv_free((SV *)imp_dbh->savepoints);
+ if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n",
THEADER_slow);
+ return;
+ }
+ }
+ pg_db_detect_client_encoding_utf8(aTHX_ imp_dbh);
+ /* If the client_encoding is UTF8, flip the utf8 flag until convinced
otherwise */
+ imp_dbh->pg_utf8_flag = imp_dbh->client_encoding_utf8;
+
+ /* Tell DBI that we should call disconnect when the handle dies */
+ DBIc_ACTIVE_on(imp_dbh);
+}
+
+int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid,
char * pwd, SV *attr)
+{
dTHR;
dTHX;
char * conn_str;
@@ -102,8 +157,15 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char *
dbname, char * uid, cha
bool inquote = DBDPG_FALSE;
STRLEN connect_string_size;
ConnStatusType connstatus;
+ int async_connect;
+
+ async_connect = want_async_connect(aTHX_ attr);
- if (TSTART_slow) TRC(DBILOGFP, "%sBegin dbd_db_login\n", THEADER_slow);
+ if (TSTART_slow) {
+ TRC(DBILOGFP, "%sBegin dbd_db_login6\n", THEADER_slow);
+ if (async_connect)
+ TRC(DBILOGFP, "%sAsync connect requested\n", THEADER_slow);
+ }
/* DBD::Pg syntax: 'dbname=dbname;host=host;port=port', 'User', 'Pass' */
/* libpq syntax: 'dbname=dbname host=host port=port user=uid password=pwd'
*/
@@ -172,12 +234,18 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char *
dbname, char * uid, cha
TRACE_PQFINISH;
PQfinish(imp_dbh->conn);
}
-
+
/* Attempt the connection to the database */
if (TLOGIN_slow) TRC(DBILOGFP, "%sLogin connection string: (%s)\n",
THEADER_slow, conn_str);
- TRACE_PQCONNECTDB;
- imp_dbh->conn = PQconnectdb(conn_str);
- if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection complete\n", THEADER_slow);
+ if (async_connect) {
+ TRACE_PQCONNECTSTART;
+ imp_dbh->conn = PQconnectStart(conn_str);
+ if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection started\n", THEADER_slow);
+ } else {
+ TRACE_PQCONNECTDB;
+ imp_dbh->conn = PQconnectdb(conn_str);
+ if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection complete\n",
THEADER_slow);
+ }
Safefree(conn_str);
/* Set the initial sqlstate */
@@ -187,7 +255,8 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char *
dbname, char * uid, cha
/* Check to see that the backend connection was successfully made */
TRACE_PQSTATUS;
connstatus = PQstatus(imp_dbh->conn);
- if (CONNECTION_OK != connstatus) {
+ switch (connstatus) {
+ case CONNECTION_BAD:
TRACE_PQERRORMESSAGE;
strncpy(imp_dbh->sqlstate, "08006", 6); /* "CONNECTION FAILURE" */
pg_error(aTHX_ dbh, connstatus, PQerrorMessage(imp_dbh->conn));
@@ -196,46 +265,15 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char *
dbname, char * uid, cha
sv_free((SV *)imp_dbh->savepoints);
if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n",
THEADER_slow);
return 0;
+
+ case CONNECTION_OK:
+ async_connect = 0;
}
/* Call the pg_warn function anytime this connection raises a notice */
TRACE_PQSETNOTICEPROCESSOR;
(void)PQsetNoticeProcessor(imp_dbh->conn, pg_warn, (void *)SvRV(dbh));
- /* Figure out what protocol this server is using (most likely 3) */
- TRACE_PQPROTOCOLVERSION;
- imp_dbh->pg_protocol = PQprotocolVersion(imp_dbh->conn);
-
- /* Figure out this particular backend's version */
- TRACE_PQSERVERVERSION;
- imp_dbh->pg_server_version = PQserverVersion(imp_dbh->conn);
-
- if (imp_dbh->pg_server_version < 80000) {
- /*
- Special workaround for PgBouncer, which has the unfortunate habit
of modifying 'server_version',
- something it should never do. If we think this is the case for the
version failure, we
- simply allow things to continue with a faked version. See github
issue #47
- */
- if (NULL != strstr(PQparameterStatus(imp_dbh->conn, "server_version"),
"bouncer")) {
- imp_dbh->pg_server_version = 90600;
- }
- else {
- TRACE_PQERRORMESSAGE;
- strncpy(imp_dbh->sqlstate, "08001", 6); /*
sqlclient_unable_to_establish_sqlconnection */
- pg_error(aTHX_ dbh, CONNECTION_BAD, "Server version 8.0 required");
- TRACE_PQFINISH;
- PQfinish(imp_dbh->conn);
- sv_free((SV *)imp_dbh->savepoints);
- if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n",
THEADER_slow);
- return 0;
- }
- }
-
- pg_db_detect_client_encoding_utf8(aTHX_ imp_dbh);
-
- /* If the client_encoding is UTF8, flip the utf8 flag until convinced
otherwise */
- imp_dbh->pg_utf8_flag = imp_dbh->client_encoding_utf8;
-
imp_dbh->pg_enable_utf8 = -1;
imp_dbh->prepare_now = DBDPG_FALSE;
@@ -252,18 +290,18 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char *
dbname, char * uid, cha
imp_dbh->copystate = 0;
imp_dbh->copybinary = DBDPG_FALSE;
imp_dbh->pg_errorlevel = 1; /* Default */
- imp_dbh->async_status = 0;
+ imp_dbh->async_status = DBH_NO_ASYNC;
imp_dbh->async_sth = NULL;
imp_dbh->last_result = NULL; /* NULL or the last PGresult returned
by a database or statement handle */
imp_dbh->result_clearable = DBDPG_TRUE;
imp_dbh->pg_int8_as_string = DBDPG_FALSE;
imp_dbh->skip_deallocate = DBDPG_FALSE;
- /* Tell DBI that we should call destroy when the handle dies */
- DBIc_IMPSET_on(imp_dbh);
-
- /* Tell DBI that we should call disconnect when the handle dies */
- DBIc_ACTIVE_on(imp_dbh);
+ /* if not connecting asynchronously, do after connect init */
+ imp_dbh->pg_protocol = -1;
+ imp_dbh->pg_server_version = -1;
+ if (async_connect) imp_dbh->async_status = DBH_ASYNC_CONNECT;
+ else after_connect_init(aTHX_ dbh, imp_dbh);
if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login\n", THEADER_slow);
@@ -271,6 +309,62 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char *
dbname, char * uid, cha
} /* end of dbd_db_login */
+int pg_db_continue_connect(SV *dbh)
+{
+ dTHX;
+ D_imp_dbh(dbh);
+ int status;
+
+ if (TSTART_slow)
+ TRC(DBILOGFP, "%sBegin pg_db_continue_connect\n", THEADER_slow);
+
+ switch (imp_dbh->async_status) {
+ default:
+ pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, "No async connect in
progress\n");
+ status = -1;
+ break;
+
+ case DBH_ASYNC_CONNECT:
+ imp_dbh->async_status = DBH_ASYNC_CONNECT_POLL;
+ status = PGRES_POLLING_WRITING;
+ break;
+
+ case DBH_ASYNC_CONNECT_POLL:
+ TRACE_PQCONNECTPOLL;
+ status = PQconnectPoll(imp_dbh->conn);
+ if (TRACE5_slow) TRC(DBILOGFP, "%sPQconnectPoll returned %d\n",
THEADER_slow, status);
+
+ switch (status) {
+ case PGRES_POLLING_READING:
+ case PGRES_POLLING_WRITING:
+ break;
+
+ case PGRES_POLLING_OK:
+ if (TLOGIN_slow) TRC(DBILOGFP, "%sconnection established\n",
THEADER_slow);
+
+ imp_dbh->async_status = DBH_NO_ASYNC;
+ after_connect_init(aTHX_ dbh, imp_dbh);
+
+ status = 0;
+ break;
+
+ case PGRES_POLLING_FAILED:
+ TRACE_PQERRORMESSAGE;
+ strncpy(imp_dbh->sqlstate, "08006", 6); /* "CONNECTION FAILURE" */
+ pg_error(aTHX_ dbh, PQstatus(imp_dbh->conn),
PQerrorMessage(imp_dbh->conn));
+ TRACE_PQFINISH;
+ PQfinish(imp_dbh->conn);
+ imp_dbh->conn = NULL;
+
+ imp_dbh->async_status = DBH_NO_ASYNC;
+
+ status = -2;
+ }
+ }
+
+ if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_continue_connect\n",
THEADER_slow);
+ return status;
+}
/* ================================================================== */
/*
diff --git a/dbdimp.h b/dbdimp.h
index de74bde..9dc4d08 100644
--- a/dbdimp.h
+++ b/dbdimp.h
@@ -144,6 +144,9 @@ extern void dbd_init (dbistate_t *dbistate);
#define dbd_db_login6 pg_db_login6
int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid,
char * pwd, SV *attr);
+#define dbd_db_continue_connect pg_db_continue_connect
+int dbd_db_continue_connect(SV *h);
+
#define dbd_db_ping pg_db_ping
int dbd_db_ping(SV *dbh);