Author: turnstep
Date: Thu Jul 12 07:30:57 2007
New Revision: 9728

Added:
   DBD-Pg/trunk/t/08async.t
Modified:
   DBD-Pg/trunk/Pg.h
   DBD-Pg/trunk/Pg.pm
   DBD-Pg/trunk/Pg.xs
   DBD-Pg/trunk/dbdimp.c
   DBD-Pg/trunk/dbdimp.h

Log:
First pass at async support: still a little rough, but a good start.


Modified: DBD-Pg/trunk/Pg.h
==============================================================================
--- DBD-Pg/trunk/Pg.h   (original)
+++ DBD-Pg/trunk/Pg.h   Thu Jul 12 07:30:57 2007
@@ -17,6 +17,9 @@
 
 #define DBDPG_TRUE (bool)1
 #define DBDPG_FALSE (bool)0
+#define DBDPG_ASYNC 1
+#define DBDPG_OLDQUERY_CANCEL 2
+#define DBDPG_OLDQUERY_WAIT 4
 
 #include "libpq-fe.h"
 

Modified: DBD-Pg/trunk/Pg.pm
==============================================================================
--- DBD-Pg/trunk/Pg.pm  (original)
+++ DBD-Pg/trunk/Pg.pm  Thu Jul 12 07:30:57 2007
@@ -26,6 +26,7 @@
 
        %EXPORT_TAGS = 
                (
+                async => [qw(DBDPG_ASYNC DBDPG_OLDQUERY_CANCEL 
DBDPG_OLDQUERY_WAIT)],
                 pg_types => [qw(
                        PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4 
PG_TEXT PG_OID PG_TID
                        PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL 
PG_BPCHAR
@@ -40,8 +41,8 @@
                sub new { my $self = {}; return bless $self, shift; }
        }
        $DBDPG_DEFAULT = DBD::Pg::DefaultValue->new();
-       Exporter::export_ok_tags('pg_types');
-       @EXPORT = qw($DBDPG_DEFAULT);
+       Exporter::export_ok_tags('pg_types', 'async');
+       @EXPORT = qw($DBDPG_DEFAULT DBDPG_ASYNC DBDPG_OLDQUERY_CANCEL 
DBDPG_OLDQUERY_WAIT);
 
        require_version DBI 1.45;
 
@@ -78,17 +79,24 @@
                });
 
 
+               DBD::Pg::db->install_method("pg_cancel");
                DBD::Pg::db->install_method("pg_endcopy");
                DBD::Pg::db->install_method("pg_getline");
                DBD::Pg::db->install_method("pg_ping");
                DBD::Pg::db->install_method("pg_putline");
+               DBD::Pg::db->install_method("pg_ready");
                DBD::Pg::db->install_method("pg_release");
+               DBD::Pg::db->install_method("pg_result");
                DBD::Pg::db->install_method("pg_rollback_to");
                DBD::Pg::db->install_method("pg_savepoint");
                DBD::Pg::db->install_method("pg_server_trace");
                DBD::Pg::db->install_method("pg_server_untrace");
                DBD::Pg::db->install_method("pg_type_info");
 
+               DBD::Pg::st->install_method("pg_cancel");
+               DBD::Pg::st->install_method("pg_result");
+               DBD::Pg::st->install_method("pg_ready");
+
                $drh;
 
        }
@@ -3085,6 +3093,643 @@
 
 =back
 
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other 
work while the query is 
+running on the backend. Both queries sent by the do() method, and by the 
execute() method can be 
+sent asynchronously. The basic usage is as follows:
+
+  use DBD::Pg ':async';
+
+  print "Async do() example:\n";
+  $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+  do_something_else();
+  {
+    if ($dbh->pg_ready()) {
+      $res = $pg_result();
+      print "Result of do(): $res\n";
+    }
+    print "Query is still running...\n";
+    if (cancel_request_received) {
+      $dbh->pg_cancel();
+    }
+    sleep 1;
+    redo;
+  }
+
+  print "Async prepare/execute example:\n";
+  $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+  $sth->execute();
+
+  ## Changed our mind, cancel and run again:
+  $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC + 
DBDPG_OLDQUERY_CANCEL});
+  $sth->execute();
+
+  do_something_else();
+
+  if (!$sth->pg_ready) {
+    do_another_thing();
+  }
+
+  ## We wait until it is done, and get the result:
+  $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can 
import all of them by putting 
+either of these at the top of your script:
+
+  use DBD::Pg;
+
+  use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants 
is recommended as it 
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the 
prepare() method as a value 
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the 
prepare method(), it causes any 
+currently running asynchronous query to be cancelled and rolled back. It has 
no effect if no asynchronous 
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the 
prepare method(), it waits for any 
+currently running asynchronous query to complete. It has no effect if no 
asynchronous query is currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running 
asynchronous query. It returns true if 
+the cancel suceeded, and false otherwise. Note that a query that has finished 
before this method is executed 
+will also return false. B<Warning>: a successful cancellation will leave the 
database in an unusable state, 
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+  $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as 
a statement handle method. Both simply 
+see if a previously issued asycnhronous query has completed yet. It returns 
true if the statement has finished, in which 
+case you should then call the pg_result() method. Calls to <pg_ready() should 
only be used when you have other 
+things to do while the query is running. If you simply want to wait until the 
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+  my $time = 0;
+  while (!$dbh->pg_ready) {
+    print "Query is still running. Seconds: $time\n";
+    $time++;
+    sleep 1;
+  }
+  $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued 
asynchronous query. If the query is still 
+running, this method will wait until it has finished. The result returned is 
the number of rows: the same thing 
+that would have been returned by the asynchronous do() or execute() if it had 
been without an 
+asychronous flag.
+
+  $result = $dbh->pg_result;
+
+=back
+
+=head2 Savepoints
+
+PostgreSQL version 8.0 introduced the concept of savepoints, which allows 
+transactions to be rolled back to a certain point without affecting the 
+rest of the transaction. DBD::Pg encourages using the following methods to 
+control savepoints:
+
+=over 4
+
+=item B<pg_savepoint>
+
+Creates a savepoint. This will fail unless you are inside of a transaction. 
The 
+only argument is the name of the savepoint. Note that PostgreSQL DOES allow 
+multiple savepoints with the same name to exist.
+
+  $dbh->pg_savepoint("mysavepoint");
+
+=item B<pg_rollback_to>
+
+Rolls the database back to a named savepoint, discarding any work performed 
after 
+that point. If more than one savepoint with that name exists, rolls back to 
the 
+most recently created one.
+
+  $dbh->pg_rollback_to("mysavepoint");
+
+=item B<pg_release>
+
+Releases (or removes) a named savepoint. If more than one savepoint with that 
name 
+exists, it will only destroy the most recently created one. Note that all 
savepoints 
+created after the one being released are also destroyed.
+
+  $dbh->pg_release("mysavepoint");
+
+=back
+
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other 
work while the query is 
+running on the backend. Both queries sent by the do() method, and by the 
execute() method can be 
+sent asynchronously. The basic usage is as follows:
+
+  use DBD::Pg ':async';
+
+  print "Async do() example:\n";
+  $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+  do_something_else();
+  {
+    if ($dbh->pg_ready()) {
+      $res = $pg_result();
+      print "Result of do(): $res\n";
+    }
+    print "Query is still running...\n";
+    if (cancel_request_received) {
+      $dbh->pg_cancel();
+    }
+    sleep 1;
+    redo;
+  }
+
+  print "Async prepare/execute example:\n";
+  $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+  $sth->execute();
+
+  ## Changed our mind, cancel and run again:
+  $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC + 
DBDPG_OLDQUERY_CANCEL});
+  $sth->execute();
+
+  do_something_else();
+
+  if (!$sth->pg_ready) {
+    do_another_thing();
+  }
+
+  ## We wait until it is done, and get the result:
+  $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can 
import all of them by putting 
+either of these at the top of your script:
+
+  use DBD::Pg;
+
+  use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants 
is recommended as it 
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the 
prepare() method as a value 
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the 
prepare method(), it causes any 
+currently running asynchronous query to be cancelled and rolled back. It has 
no effect if no asynchronous 
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the 
prepare method(), it waits for any 
+currently running asynchronous query to complete. It has no effect if no 
asynchronous query is currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running 
asynchronous query. It returns true if 
+the cancel suceeded, and false otherwise. Note that a query that has finished 
before this method is executed 
+will also return false. B<Warning>: a successful cancellation will leave the 
database in an unusable state, 
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+  $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as 
a statement handle method. Both simply 
+see if a previously issued asycnhronous query has completed yet. It returns 
true if the statement has finished, in which 
+case you should then call the pg_result() method. Calls to <pg_ready() should 
only be used when you have other 
+things to do while the query is running. If you simply want to wait until the 
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+  my $time = 0;
+  while (!$dbh->pg_ready) {
+    print "Query is still running. Seconds: $time\n";
+    $time++;
+    sleep 1;
+  }
+  $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued 
asynchronous query. If the query is still 
+running, this method will wait until it has finished. The result returned is 
the number of rows: the same thing 
+that would have been returned by the asynchronous do() or execute() if it had 
been without an 
+asychronous flag.
+
+  $result = $dbh->pg_result;
+
+=back
+
+=head2 Savepoints
+
+PostgreSQL version 8.0 introduced the concept of savepoints, which allows 
+transactions to be rolled back to a certain point without affecting the 
+rest of the transaction. DBD::Pg encourages using the following methods to 
+control savepoints:
+
+=over 4
+
+=item B<pg_savepoint>
+
+Creates a savepoint. This will fail unless you are inside of a transaction. 
The 
+only argument is the name of the savepoint. Note that PostgreSQL DOES allow 
+multiple savepoints with the same name to exist.
+
+  $dbh->pg_savepoint("mysavepoint");
+
+=item B<pg_rollback_to>
+
+Rolls the database back to a named savepoint, discarding any work performed 
after 
+that point. If more than one savepoint with that name exists, rolls back to 
the 
+most recently created one.
+
+  $dbh->pg_rollback_to("mysavepoint");
+
+=item B<pg_release>
+
+Releases (or removes) a named savepoint. If more than one savepoint with that 
name 
+exists, it will only destroy the most recently created one. Note that all 
savepoints 
+created after the one being released are also destroyed.
+
+  $dbh->pg_release("mysavepoint");
+
+=back
+
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other 
work while the query is 
+running on the backend. Both queries sent by the do() method, and by the 
execute() method can be 
+sent asynchronously. The basic usage is as follows:
+
+  use DBD::Pg ':async';
+
+  print "Async do() example:\n";
+  $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+  do_something_else();
+  {
+    if ($dbh->pg_ready()) {
+      $res = $pg_result();
+      print "Result of do(): $res\n";
+    }
+    print "Query is still running...\n";
+    if (cancel_request_received) {
+      $dbh->pg_cancel();
+    }
+    sleep 1;
+    redo;
+  }
+
+  print "Async prepare/execute example:\n";
+  $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+  $sth->execute();
+
+  ## Changed our mind, cancel and run again:
+  $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC + 
DBDPG_OLDQUERY_CANCEL});
+  $sth->execute();
+
+  do_something_else();
+
+  if (!$sth->pg_ready) {
+    do_another_thing();
+  }
+
+  ## We wait until it is done, and get the result:
+  $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can 
import all of them by putting 
+either of these at the top of your script:
+
+  use DBD::Pg;
+
+  use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants 
is recommended as it 
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the 
prepare() method as a value 
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the 
prepare method(), it causes any 
+currently running asynchronous query to be cancelled and rolled back. It has 
no effect if no asynchronous 
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the 
prepare method(), it waits for any 
+currently running asynchronous query to complete. It has no effect if no 
asynchronous query is currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running 
asynchronous query. It returns true if 
+the cancel suceeded, and false otherwise. Note that a query that has finished 
before this method is executed 
+will also return false. B<Warning>: a successful cancellation will leave the 
database in an unusable state, 
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+  $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as 
a statement handle method. Both simply 
+see if a previously issued asycnhronous query has completed yet. It returns 
true if the statement has finished, in which 
+case you should then call the pg_result() method. Calls to <pg_ready() should 
only be used when you have other 
+things to do while the query is running. If you simply want to wait until the 
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+  my $time = 0;
+  while (!$dbh->pg_ready) {
+    print "Query is still running. Seconds: $time\n";
+    $time++;
+    sleep 1;
+  }
+  $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued 
asynchronous query. If the query is still 
+running, this method will wait until it has finished. The result returned is 
the number of rows: the same thing 
+that would have been returned by the asynchronous do() or execute() if it had 
been without an 
+asychronous flag.
+
+  $result = $dbh->pg_result;
+
+=back
+
+=head2 Savepoints
+
+PostgreSQL version 8.0 introduced the concept of savepoints, which allows 
+transactions to be rolled back to a certain point without affecting the 
+rest of the transaction. DBD::Pg encourages using the following methods to 
+control savepoints:
+
+=over 4
+
+=item B<pg_savepoint>
+
+Creates a savepoint. This will fail unless you are inside of a transaction. 
The 
+only argument is the name of the savepoint. Note that PostgreSQL DOES allow 
+multiple savepoints with the same name to exist.
+
+  $dbh->pg_savepoint("mysavepoint");
+
+=item B<pg_rollback_to>
+
+Rolls the database back to a named savepoint, discarding any work performed 
after 
+that point. If more than one savepoint with that name exists, rolls back to 
the 
+most recently created one.
+
+  $dbh->pg_rollback_to("mysavepoint");
+
+=item B<pg_release>
+
+Releases (or removes) a named savepoint. If more than one savepoint with that 
name 
+exists, it will only destroy the most recently created one. Note that all 
savepoints 
+created after the one being released are also destroyed.
+
+  $dbh->pg_release("mysavepoint");
+
+=back
+
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other 
work while the query is 
+running on the backend. Both queries sent by the do() method, and by the 
execute() method can be 
+sent asynchronously. The basic usage is as follows:
+
+  use DBD::Pg ':async';
+
+  print "Async do() example:\n";
+  $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+  do_something_else();
+  {
+    if ($dbh->pg_ready()) {
+      $res = $pg_result();
+      print "Result of do(): $res\n";
+    }
+    print "Query is still running...\n";
+    if (cancel_request_received) {
+      $dbh->pg_cancel();
+    }
+    sleep 1;
+    redo;
+  }
+
+  print "Async prepare/execute example:\n";
+  $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+  $sth->execute();
+
+  ## Changed our mind, cancel and run again:
+  $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC + 
DBDPG_OLDQUERY_CANCEL});
+  $sth->execute();
+
+  do_something_else();
+
+  if (!$sth->pg_ready) {
+    do_another_thing();
+  }
+
+  ## We wait until it is done, and get the result:
+  $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can 
import all of them by putting 
+either of these at the top of your script:
+
+  use DBD::Pg;
+
+  use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants 
is recommended as it 
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the 
prepare() method as a value 
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the 
prepare method(), it causes any 
+currently running asynchronous query to be cancelled and rolled back. It has 
no effect if no asynchronous 
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the 
prepare method(), it waits for any 
+currently running asynchronous query to complete. It has no effect if there is 
no asynchronous query currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running 
asynchronous query. It returns true if 
+the cancel suceeded, and false otherwise. Note that a query that has finished 
before this method is executed 
+will also return false. B<WARNING>: a successful cancellation will leave the 
database in an unusable state, 
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+  $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as 
a statement handle method. Both simply 
+see if a previously issued asycnhronous query has completed yet. It returns 
true if the statement has finished, in which 
+case you should then call the pg_result() method. Calls to pg_ready() should 
only be used when you have other 
+things to do while the query is running. If you simply want to wait until the 
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+  my $time = 0;
+  while (!$dbh->pg_ready) {
+    print "Query is still running. Seconds: $time\n";
+    $time++;
+    sleep 1;
+  }
+  $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued 
asynchronous query. If the query is still 
+running, this method will wait until it has finished. The result returned is 
the number of rows: the same thing 
+that would have been returned by the asynchronous do() or execute() if it had 
been called without an asychronous flag.
+
+  $result = $dbh->pg_result;
+
+=back
+
+=head3 Asynchronous Examples
+
+Here are some working examples of asynchronous queries. Note that we'll use 
the pg_sleep function to emulate a 
+long-running query.
+
+  use strict;
+  use warnings;
+  use Time::HiRes 'sleep';
+  use DBD::Pg ':async';
+
+  my $dbh = DBI->connect('dbi:Pg:dbname=postgres', 'postgres', '', 
{AutoCommit=>0,RaiseError=>1});
+
+  ## Kick off a long running query on the first database:
+  my $sth = $dbh->prepare("SELECT pg_sleep(?)", {pg_async => DBDPG_ASYNC});
+  $sth->execute(5);
+
+  ## While that is running, do some other things
+  print "Your query is processing. Thanks for waiting\n";
+  check_on_the_kids(); ## Expensive sub, takes at least three seconds.
+
+  while (!$dbh->pg_ready) {
+    check_on_the_kids();
+    ## If the above function returns quickly for some reason, we add a small 
sleep
+    sleep 0.1;
+  }
+
+  print "The query has finished. Gathering results\n";
+  my $result = $sth->pg_result;
+  print "Result: $result\n";
+  my $info = $sth->fetchall_arrayref();
+
+Without asynchronous queries, the above script would take about 8 seconds to 
run: five seconds waiting 
+for the execute to finish, then three for the check_on_the_kids() function to 
return. With asynchronous 
+queries, the script takes about 6 seconds to run, and gets in two iterations 
of check_on_the_kids in 
+the process.
+
+Here's an example showing the ability to cancel a long-running query. Imagine 
two slave databases in 
+different geographic locations over a slow network. You need information as 
quickly as possible, so 
+you query both at once. When you get an answer, you tell the other one to stop 
working on your query, 
+as you don't need it anymore.
+
+  use strict;
+  use warnings;
+  use Time::HiRes 'sleep';
+  use DBD::Pg ':async';
+
+  my $dbhslave1 = DBI->connect('dbi:Pg:dbname=postgres;host=slave1', 
'postgres', '', {AutoCommit=>0,RaiseError=>1});
+  my $dbhslave2 = DBI->connect('dbi:Pg:dbname=postgres;host=slave2', 
'postgres', '', {AutoCommit=>0,RaiseError=>1});
+
+  $SQL = "SELECT count(*) FROM largetable WHERE flavor='blueberry'";
+
+  my $sth1 = $dbhslave1->prepare($SQL, {pg_async => DBDPG_ASYNC});
+  my $sth2 = $dbhslave2->prepare($SQL, {pg_async => DBDPG_ASYNC});
+
+  $sth1->execute();
+  $sth2->execute();
+
+  my $winner;
+  while (!defined $winner) {
+    if ($sth1->pg_ready) {
+      $winner = 1;
+    }
+    elsif ($sth2->pg_ready) {
+      $winner = 2;
+    }
+    Time::HiRes::sleep 0.05;
+  }
+
+  my $count;
+  if ($winner == 1) {
+    $sth2->pg_cancel();
+    $sth1->pg_result();
+    $count = $sth1->fetchall_arrayref()->[0][0];
+  }
+  else {
+    $sth1->pg_cancel();
+    $sth2->pg_result();
+    $count = $sth2->fetchall_arrayref()->[0][0];
+  }
+
+
 =head2 COPY support
 
 DBD::Pg supports the COPY command through three functions: pg_putline, 

Modified: DBD-Pg/trunk/Pg.xs
==============================================================================
--- DBD-Pg/trunk/Pg.xs  (original)
+++ DBD-Pg/trunk/Pg.xs  Thu Jul 12 07:30:57 2007
@@ -61,6 +61,10 @@
        PG_OID       = 26
        PG_TID       = 27
 
+       DBDPG_ASYNC           = DBDPG_ASYNC
+       DBDPG_OLDQUERY_CANCEL = DBDPG_OLDQUERY_CANCEL
+       DBDPG_OLDQUERY_WAIT   = DBDPG_OLDQUERY_WAIT
+
        CODE:
                if (0==ix) {
                        if (!name) {
@@ -171,27 +175,34 @@
        CODE:
        {
                int retval;
+               int asyncflag = 0;
+               SV **svp;
 
                if (strlen(statement)<1) { /* Corner case */
                        XST_mUNDEF(0);
                        return;
                }
 
-               if (items < 3) { /* No attribs, no arguments */
+               if (attr && SvROK(attr) && SvTYPE(SvRV(attr)) == SVt_PVHV) {
+                       if ((svp = hv_fetch((HV*)SvRV(attr),"pg_async", 8, 0)) 
!= NULL) {
+                          asyncflag = SvIV(*svp);
+                       }
+               }
+               if (items < 4) { /* No bind arguments */
                        /* Quick run via PQexec */
-                       retval = pg_quickexec(dbh, statement);
+                       retval = pg_quickexec(dbh, statement, asyncflag);
                }
-               else { /* The normal, slower way */
+               else { /* We've got bind arguments, so we do the whole 
prepare/execute route */
                        imp_sth_t *imp_sth;
                        SV * sth = dbixst_bounce_method("prepare", 3);
                        if (!SvROK(sth))
                                XSRETURN_UNDEF;
                        imp_sth = (imp_sth_t*)(DBIh_COM(sth));
-                       if (items > 3)
-                               if (!dbdxst_bind_params(sth, imp_sth, items-2, 
ax+2))
-                                       XSRETURN_UNDEF;
+                       if (!dbdxst_bind_params(sth, imp_sth, items-2, ax+2))
+                               XSRETURN_UNDEF;
                        imp_sth->server_prepare = 1;
                        imp_sth->onetime = 1; /* Overrides the above at actual 
PQexec* decision time */
+                       imp_sth->async_flag = asyncflag;
                        retval = dbd_st_execute(sth, imp_sth);
                }
 
@@ -467,6 +478,34 @@
                ST(0) = sv_2mortal( newSViv( type_num ) );
        }
 
+void
+pg_result(dbh)
+       SV * dbh
+       CODE:
+               int ret;
+               D_imp_dbh(dbh);
+               ret = dbdpg_result(dbh, imp_dbh);
+               if (ret == 0)
+                       XST_mPV(0, "0E0");
+               else if (ret < -1)
+                       XST_mUNDEF(0);
+               else
+                       XST_mIV(0, ret);
+
+void
+pg_ready(dbh)
+       SV *dbh
+       CODE:
+               D_imp_dbh(dbh);
+               ST(0) = sv_2mortal(newSViv(dbdpg_ready(dbh, imp_dbh)));
+
+void
+pg_cancel(dbh)
+       SV *dbh
+       CODE:
+       D_imp_dbh(dbh);
+       ST(0) = dbdpg_cancel(dbh, imp_dbh) ? &sv_yes : &sv_no;
+
 # -- end of DBD::Pg::db
 
 
@@ -482,4 +521,35 @@
                D_imp_dbh_from_sth;
                ST(0) = strEQ(imp_dbh->sqlstate,"00000") ? &sv_no : 
newSVpv(imp_dbh->sqlstate, 5);
 
+void
+pg_ready(sth)
+       SV *sth
+       CODE:
+               D_imp_sth(sth);
+               D_imp_dbh_from_sth;
+               ST(0) = sv_2mortal(newSViv(dbdpg_ready(sth, imp_dbh)));
+
+void
+pg_cancel(sth)
+       SV *sth
+       CODE:
+       D_imp_sth(sth);
+       ST(0) = dbdpg_cancel_sth(sth, imp_sth) ? &sv_yes : &sv_no;
+
+void
+pg_result(sth)
+       SV * sth
+       CODE:
+               int ret;
+               D_imp_sth(sth);
+               D_imp_dbh_from_sth;
+               ret = dbdpg_result(sth, imp_dbh);
+               if (ret == 0)
+                       XST_mPV(0, "0E0");
+               else if (ret < -1)
+                       XST_mUNDEF(0);
+               else
+                       XST_mIV(0, ret);
+
+
 # end of Pg.xs

Modified: DBD-Pg/trunk/dbdimp.c
==============================================================================
--- DBD-Pg/trunk/dbdimp.c       (original)
+++ DBD-Pg/trunk/dbdimp.c       Thu Jul 12 07:30:57 2007
@@ -68,6 +68,7 @@
 static int dbd_st_deallocate_statement(SV *sth, imp_sth_t *imp_sth);
 static PGTransactionStatusType dbd_db_txn_status (imp_dbh_t *imp_dbh);
 static int pg_db_start_txn (SV *dbh, imp_dbh_t *imp_dbh);
+static int handle_old_async(SV * handle, imp_dbh_t * imp_dbh, int asyncflag);
 
 DBISTATE_DECLARE;
 
@@ -221,6 +222,8 @@
        imp_dbh->prepare_number = 1;
        imp_dbh->copystate      = 0;
        imp_dbh->pg_errorlevel  = 1; /* Default */
+       imp_dbh->async_status   = 0;
+       imp_dbh->async_sth      = NULL;
 
        /* If the server can handle it, we default to "smart", otherwise "off" 
*/
        imp_dbh->server_prepare = imp_dbh->pg_protocol >= 3 ? 
@@ -330,8 +333,7 @@
          we are connecting over TCP/IP, only set it here if non-null, and fall 
through 
          to a better default value below.
     */
-       if (result
-               && NULL != PQresultErrorField(result,PG_DIAG_SQLSTATE)) {
+       if (result && NULL != PQresultErrorField(result,PG_DIAG_SQLSTATE)) {
                strncpy(imp_dbh->sqlstate, 
PQresultErrorField(result,PG_DIAG_SQLSTATE), 5);
                imp_dbh->sqlstate[5] = '\0';
                stateset = DBDPG_TRUE;
@@ -534,11 +536,18 @@
 /* ================================================================== */
 void dbd_db_destroy (SV * dbh, imp_dbh_t * imp_dbh)
 {
-       if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
dbd_db_destroy\n"); }
+       if (dbis->debug >= 4)
+               (void)PerlIO_printf(DBILOGFP, "dbdpg: dbd_db_destroy\n");
 
        if (DBIc_ACTIVE(imp_dbh))
                (void)dbd_db_disconnect(dbh, imp_dbh);
 
+       if (imp_dbh->async_sth) { /* Just in case */
+               if (imp_dbh->async_sth->result)
+                       PQclear(imp_dbh->async_sth->result);
+               imp_dbh->async_sth = NULL;
+       }
+
        av_undef(imp_dbh->savepoints);
        sv_free((SV *)imp_dbh->savepoints);
        Safefree(imp_dbh->sqlstate);
@@ -635,10 +644,12 @@
 #endif
                break;
 
-       case 15: /* pg_default_port */
+       case 15: /* pg_default_port pg_async_status */
 
                if (strEQ("pg_default_port", key))
                        retsv = newSViv((IV) PGDEFPORT );
+               else if (strEQ("pg_async_status", key))
+                       retsv = newSViv((IV)imp_dbh->async_status);
                break;
 
        case 17: /* pg_server_prepare  pg_server_version */
@@ -925,9 +936,11 @@
                }
                break;
 
-       case 8: /* NULLABLE */
+       case 8: /* pg_async  NULLABLE */
 
-               if (strEQ("NULLABLE", key)) {
+               if (strEQ("pg_async", key))
+                       retsv = newSViv((IV)imp_sth->async_flag);
+               else if (strEQ("NULLABLE", key)) {
                        AV *av = newAV();
                        PGresult *result;
                        int status = -1;
@@ -1038,6 +1051,13 @@
        
        switch (kl) {
 
+       case 8: /* pg_async */
+
+               if (strEQ("pg_async", key)) {
+                       imp_sth->async_flag = SvIV(valuesv);
+                       return 1;
+               }
+
        case 14: /* pg_prepare_now */
 
                if (strEQ("pg_prepare_now", key)) {
@@ -1094,7 +1114,7 @@
 } /* end of dbd_discon_all */
 
 
-// XXX IS this really needed die to $dbh->{pg_socket}?
+/* Deprecated in favor of $dbh->{pg_socket} */
 /* ================================================================== */
 int dbd_db_getfd (SV * dbh, imp_dbh_t * imp_dbh)
 {
@@ -1160,6 +1180,8 @@
        imp_sth->cur_tuple        = 0;
        imp_sth->rows             = -1; /* per DBI spec */
        imp_sth->totalsize        = 0;
+       imp_sth->async_flag       = 0;
+       imp_sth->async_status     = 0;
        imp_sth->prepare_name     = NULL;
        imp_sth->firstword        = NULL;
        imp_sth->result           = NULL;
@@ -1199,6 +1221,9 @@
                if ((svp = 
hv_fetch((HV*)SvRV(attribs),"pg_placeholder_dollaronly", 25, 0)) != NULL) {
                        imp_sth->dollaronly = SvTRUE(*svp) ? DBDPG_TRUE : 
DBDPG_FALSE;
                }
+               if ((svp = hv_fetch((HV*)SvRV(attribs),"pg_async", 8, 0)) != 
NULL) {
+                 imp_sth->async_flag = SvIV(*svp);
+               }
        }
 
        /* Figure out the first word in the statement */
@@ -1869,9 +1894,10 @@
                }
                result = PQprepare(imp_dbh->conn, imp_sth->prepare_name, 
statement, params, paramTypes);
                Safefree(paramTypes);
-               if (result)
+               if (result) {
                        status = PQresultStatus(result);
-               PQclear(result);
+                       PQclear(result);
+               }
                if (dbis->debug >= 6)
                        (void)PerlIO_printf(DBILOGFP, "dbdpg: Using PQprepare: 
%s\n", statement);
        }
@@ -2093,7 +2119,7 @@
 
 
 /* ================================================================== */
-int pg_quickexec (SV * dbh, const char * sql)
+int pg_quickexec (SV * dbh, const char * sql, int asyncflag)
 {
        D_imp_dbh(dbh);
        PGresult *     result;
@@ -2102,7 +2128,8 @@
        int            rows = 0;
 
        if (dbis->debug >= 4)
-               (void)PerlIO_printf(DBILOGFP, "dbdpg: pg_quickexec (%s)\n", 
sql);
+               (void)PerlIO_printf(DBILOGFP, "dbdpg: dbdpg_quickexec 
query=(%s) async=(%d) async_status=(%d)\n",
+                                                       sql, asyncflag, 
imp_dbh->async_status);
 
        if (NULL == imp_dbh->conn)
                croak("execute on disconnected handle");
@@ -2111,6 +2138,14 @@
        if (imp_dbh->copystate!=0)
                croak("Must call pg_endcopy before issuing more commands");
 
+       /* If we are still waiting on an async, handle it */
+       if (imp_dbh->async_status) {
+         if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg: handling 
old async\n");
+         rows = handle_old_async(dbh, imp_dbh, asyncflag);
+         if (rows)
+               return rows;
+       }
+
        /* If not autocommit, start a new transaction */
        if (!imp_dbh->done_begin && !DBIc_has(imp_dbh, DBIcf_AutoCommit)) {
                status = _result(imp_dbh, "begin");
@@ -2121,6 +2156,20 @@
                imp_dbh->done_begin = DBDPG_TRUE;
        }
 
+       /* Asynchronous commands get kicked off and return undef */
+       if (asyncflag & DBDPG_ASYNC) {
+         if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg: Going 
asychronous with do()\n");
+         if (! PQsendQuery(imp_dbh->conn, sql)) {
+               if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg: 
PQsendQuery failed\n");
+               pg_error(dbh, status, PQerrorMessage(imp_dbh->conn));
+               return -2;
+         }
+         imp_dbh->async_status = 1;
+         imp_dbh->async_sth = NULL; // Needed?
+         if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg: 
PQsendQuery worked\n");
+         return 0;
+       }
+
        result = PQexec(imp_dbh->conn, sql);
        status = _sqlstate(imp_dbh, result);
 
@@ -2205,6 +2254,17 @@
                }
        }
 
+       /* Check for old async transactions */
+       if (imp_dbh->async_status) {
+         if (dbis->debug >= 7) {
+               (void)PerlIO_printf
+                 (DBILOGFP, "dbdpg: Attempting to handle existing async 
transaction\n");
+         }       
+         ret = handle_old_async(sth, imp_dbh, imp_sth->async_flag);
+         if (ret)
+               return ret;
+       }
+
        /* If not autocommit, start a new transaction */
        if (!imp_dbh->done_begin && !DBIc_has(imp_dbh, DBIcf_AutoCommit)) {
                status = _result(imp_dbh, "begin");
@@ -2216,8 +2276,10 @@
        }
 
        /* clear old result (if any) */
-       if (imp_sth->result != NULL)
+       if (imp_sth->result) {
                PQclear(imp_sth->result);
+               imp_sth->result = NULL;
+       }
 
        /*
          Now, we need to build the statement to send to the backend
@@ -2348,7 +2410,11 @@
                
                if (dbis->debug >= 5)
                        (void)PerlIO_printf(DBILOGFP, "dbdpg: Running 
PQexecPrepared with (%s)\n", imp_sth->prepare_name);
-               imp_sth->result = PQexecPrepared
+               if (imp_sth->async_flag & DBDPG_ASYNC)
+                 ret = PQsendQueryPrepared
+                       (imp_dbh->conn, imp_sth->prepare_name, imp_sth->numphs, 
paramValues, paramLengths, paramFormats, 0);
+               else
+                 imp_sth->result = PQexecPrepared
                        (imp_dbh->conn, imp_sth->prepare_name, imp_sth->numphs, 
paramValues, paramLengths, paramFormats, 0);
 
        } /* end new-style prepare */
@@ -2412,7 +2478,11 @@
 
                        if (dbis->debug >= 5)
                                (void)PerlIO_printf(DBILOGFP, "dbdpg: Running 
PQexecParams with (%s)\n", statement);
-                       imp_sth->result = PQexecParams
+                       if (imp_sth->async_flag & DBDPG_ASYNC)
+                         ret = PQsendQueryParams
+                               (imp_dbh->conn, statement, imp_sth->numphs, 
paramTypes, paramValues, paramLengths, paramFormats, 0);
+                       else
+                         imp_sth->result = PQexecParams
                                (imp_dbh->conn, statement, imp_sth->numphs, 
paramTypes, paramValues, paramLengths, paramFormats, 0);
                        Safefree(paramTypes);
                }
@@ -2440,9 +2510,13 @@
                        statement[execsize] = '\0';
 
                        if (dbis->debug >= 5)
-                               (void)PerlIO_printf(DBILOGFP, "dbdpg: Running 
PQexec with (%s)\n", statement);
+                               (void)PerlIO_printf(DBILOGFP, "dbdpg: Running 
%s with (%s)\n", 
+                                                                       
imp_sth->async_flag & 1 ? "PQsendQuery" : "PQexec", statement);
                        
-                       imp_sth->result = PQexec(imp_dbh->conn, statement);
+                       if (imp_sth->async_flag & DBDPG_ASYNC)
+                         ret = PQsendQuery(imp_dbh->conn, statement);
+                       else
+                         imp_sth->result = PQexec(imp_dbh->conn, statement);
 
                } /* end PQexec */
 
@@ -2450,14 +2524,25 @@
 
        } /* end non-prepared exec */
 
-       /* Some form of PQexec has been run at this point */
-
-       status = _sqlstate(imp_dbh, imp_sth->result);
+       /* Some form of PQexec/PQsendQuery has been run at this point */
 
        Safefree(paramValues);
        Safefree(paramLengths);
        Safefree(paramFormats);                 
 
+       /* If running asynchronously, we don't stick around for the result */
+       if (imp_sth->async_flag & DBDPG_ASYNC) {
+               if (dbis->debug >= 2)
+                 (void)PerlIO_printf
+                       (DBILOGFP, "dbdpg: Early return for async query");
+               imp_dbh->async_status = 1;
+               imp_sth->async_status = 1;
+               imp_dbh->async_sth = imp_sth;
+               return 0;
+       }
+
+       status = _sqlstate(imp_dbh, imp_sth->result);
+
        imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise 
*/
        if (PGRES_TUPLES_OK == status) {
                num_fields = PQnfields(imp_sth->result);
@@ -2641,8 +2726,11 @@
 int dbd_st_finish (SV * sth, imp_sth_t * imp_sth)
 {
        
+       D_imp_dbh_from_sth;
+
        if (dbis->debug >= 4)
-               (void)PerlIO_printf(DBILOGFP, "dbdpg: dbd_st_finish sth=%d\n", 
sth);
+               (void)PerlIO_printf(DBILOGFP, "dbdpg: dbdpg_finish sth=%d 
async=%d\n",
+                                                       sth, 
imp_dbh->async_status);
        
        if (DBIc_ACTIVE(imp_sth) && imp_sth->result) {
                PQclear(imp_sth->result);
@@ -2650,6 +2738,16 @@
                imp_sth->rows = 0;
        }
        
+       /* Are we in the middle of an async for this statement handle? */
+       if (imp_dbh->async_status) {
+         if (imp_sth->async_status) {
+               handle_old_async(sth, imp_dbh, DBDPG_OLDQUERY_WAIT);
+         }
+       }
+
+       imp_sth->async_status = 0;
+       imp_dbh->async_sth = NULL;
+
        DBIc_ACTIVE_off(imp_sth);
        return 1;
 
@@ -2759,6 +2857,10 @@
                return;
        }
 
+       if (imp_dbh->async_status) {
+         handle_old_async(sth, imp_dbh, DBDPG_OLDQUERY_WAIT);
+       }
+
        /* Deallocate only if we named this statement ourselves and we still 
have a good connection */
        /* On rare occasions, dbd_db_destroy is called first and we can no 
longer rely on imp_dbh */
        if (imp_sth->prepared_by_us && DBIc_ACTIVE(imp_dbh)) {
@@ -2772,7 +2874,7 @@
        Safefree(imp_sth->type_info);
        Safefree(imp_sth->firstword);
 
-       if (NULL != imp_sth->result) {
+       if (imp_sth->result) {
                PQclear(imp_sth->result);
                imp_sth->result = NULL;
        }
@@ -2801,6 +2903,9 @@
        }
        imp_sth->ph = NULL;
 
+       if (imp_dbh->async_sth)
+               imp_dbh->async_sth = NULL;
+
        DBIc_IMPSET_off(imp_sth); /* let DBI know we've done it */
 
 } /* end of dbd_st_destroy */
@@ -3296,6 +3401,295 @@
 
 } /* end of dbd_st_blob_read */
 
+
+
+/* ================================================================== */
+/* Return the result of an asynchronous query, waiting if needed */
+int dbdpg_result (h, imp_dbh)
+                SV *h;
+                imp_dbh_t *imp_dbh;
+{
+
+       PGresult *result;
+       ExecStatusType status = PGRES_FATAL_ERROR;
+       int rows;
+       char *cmdStatus = NULL;
+
+       if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
dbdpg_result\n"); }
+
+       if (1 != imp_dbh->async_status) {
+               pg_error(h, PGRES_FATAL_ERROR, "No asynchronous query is 
running\n");
+               return -2;
+       }       
+
+       imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise 
*/
+
+       while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
+         /* TODO: Better multiple result-set handling */
+         status = _sqlstate(imp_dbh, result);
+         switch (status) {
+         case PGRES_TUPLES_OK:
+               rows = PQntuples(result);
+
+               if (imp_dbh->async_sth) {
+                 imp_dbh->async_sth->cur_tuple = 0;
+                 DBIc_NUM_FIELDS(imp_dbh->async_sth) = PQnfields(result);
+                 DBIc_ACTIVE_on(imp_dbh->async_sth);
+               }
+
+               break;
+         case PGRES_COMMAND_OK:
+               /* non-select statement */
+               cmdStatus = PQcmdStatus(result);
+               if ((0==strncmp(cmdStatus, "DELETE", 6)) || 
(0==strncmp(cmdStatus, "INSERT", 6)) || 
+                       (0==strncmp(cmdStatus, "UPDATE", 6))) {
+                 rows = atoi(PQcmdTuples(result));
+               }
+               break;
+         case PGRES_COPY_OUT:
+         case PGRES_COPY_IN:
+               /* Copy Out/In data transfer in progress */
+               imp_dbh->copystate = status;
+               rows = -1;
+               break;
+         case PGRES_EMPTY_QUERY:
+         case PGRES_BAD_RESPONSE:
+         case PGRES_NONFATAL_ERROR:
+               rows = -2;
+               pg_error(h, status, PQerrorMessage(imp_dbh->conn));
+               break;
+         case PGRES_FATAL_ERROR:
+         default:
+               rows = -2;
+               pg_error(h, status, PQerrorMessage(imp_dbh->conn));
+               break;
+         }
+
+         if (imp_dbh->async_sth) {
+               if (imp_dbh->async_sth->result) /* For potential multi-result 
sets */
+                 PQclear(imp_dbh->async_sth->result);
+               imp_dbh->async_sth->result = result;
+         }
+         else {
+                 PQclear(result);
+         }
+       }
+
+       if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
dbdpg_result returning %d\n", rows); }
+       if (imp_dbh->async_sth) {
+         imp_dbh->async_sth->rows = rows;
+         imp_dbh->async_sth->async_status = 0;
+       }
+       imp_dbh->async_status = 0;
+       return rows;
+
+} /* end of dbdpg_result */
+
+
+/* 
+==================================================================
+Indicates if an asynchronous query has finished yet
+Accepts either a database or a statement handle
+Returns:
+  -1 if no query is running (and raises an exception)
+  +1 if the query is finished
+   0 if the query is still running
+  -2 for other errors
+==================================================================
+*/
+
+int dbdpg_ready (h, imp_dbh)
+                SV *h;
+                imp_dbh_t *imp_dbh;
+{
+
+       if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
pg_st_ready\n"); }
+
+       if (0 == imp_dbh->async_status) {
+               pg_error(h, PGRES_FATAL_ERROR, "No asynchronous query is 
running\n");
+               return -1;
+       }       
+
+       if (!PQconsumeInput(imp_dbh->conn)) {
+         pg_error(h, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn));
+         return -2;
+       }
+
+       return ! PQisBusy(imp_dbh->conn);
+
+} /* end of dbdpg_ready */
+
+
+/*
+Attempt to cancel a running asynchronous query
+Returns true if the cancel succeeded, and false if it did not
+If it did successfully cancel the query, it will also do a rollback.
+Note that queries which have finished do not cause a rollback.
+In this case, pg_cancel will return false.
+NOTE: We only return true if we cancelled and rolled back!
+*/
+
+int dbdpg_cancel(h, imp_dbh)
+        SV *h;
+        imp_dbh_t *imp_dbh;
+{
+
+       PGcancel *cancel;
+       char errbuf[256];
+       PGresult *result;
+       ExecStatusType status;
+
+       if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
dbdpg_cancel, async=%d\n", imp_dbh->async_status); }
+
+       if (0 == imp_dbh->async_status) {
+         pg_error(h, PGRES_FATAL_ERROR, "No asynchronous query is running");
+         return DBDPG_FALSE;
+       }
+
+       if (-1 == imp_dbh->async_status) {
+         pg_error(h, PGRES_FATAL_ERROR, "Asychronous query has already been 
cancelled");
+         return DBDPG_FALSE;
+       }
+
+       /* Get the cancel structure */
+       cancel = PQgetCancel(imp_dbh->conn);
+
+       /* This almost always works. If not, free our structure and complain 
looudly */
+       if (! PQcancel(cancel,errbuf,sizeof(errbuf))) {
+         PQfreeCancel(cancel);
+         if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
PQcancel failed: %s\n", errbuf); }
+         pg_error(h, PGRES_FATAL_ERROR, "PQcancel failed");
+         return DBDPG_FALSE;
+       }
+       PQfreeCancel(cancel);
+
+       /* Whatever else happens, we should no longer be inside of an async 
query */
+       imp_dbh->async_status = -1;
+       if (imp_dbh->async_sth)
+         imp_dbh->async_sth->async_status = -1;
+
+       /* Read in the result - assume only one */
+       result = PQgetResult(imp_dbh->conn);
+       if (!result) {
+         pg_error(h, PGRES_FATAL_ERROR, "Failed to get a result after 
PQcancel");
+         return DBDPG_FALSE;
+       }
+
+       status = _sqlstate(imp_dbh, result);
+
+       /* If we actually cancelled a running query, perform a rollback */
+       if (0 == strncmp(imp_dbh->sqlstate, "57014", 5)) {
+         if (dbis->debug >= 0) { (void)PerlIO_printf(DBILOGFP, "dbdpg: Rolling 
back after cancelled query\n"); }
+         dbd_db_rollback(h, imp_dbh);
+         //      PQexec(imp_dbh->conn, "ROLLBACK");
+         return DBDPG_TRUE;
+       }
+
+       /* If we got any other error, make sure we report it */
+       if (0 != strncmp(imp_dbh->sqlstate, "00000", 5)) {
+         if (dbis->debug >= 0) { (void)PerlIO_printf(DBILOGFP, "dbdpg: Query 
was not cancelled: was already finished\n"); }
+         pg_error(h, status, PQerrorMessage(imp_dbh->conn));
+       }
+       
+       return DBDPG_FALSE;
+
+} /* end of dbdpg_cancel */
+
+
+int dbdpg_cancel_sth(sth, imp_sth)
+        SV *sth;
+        imp_sth_t *imp_sth;
+{
+
+    D_imp_dbh_from_sth;
+       bool cancel_result;
+
+       cancel_result = dbdpg_cancel(sth, imp_dbh);
+
+       dbd_st_finish(sth, imp_sth);
+
+       return cancel_result;
+
+} /* end of dbdpg_cancel */
+
+
+/*
+Finish up an existing async query, either by cancelling it,
+or by waiting for a result.
+
+ */
+static int handle_old_async(SV * handle, imp_dbh_t * imp_dbh, int asyncflag)
+{
+
+  PGresult *result;
+  ExecStatusType status;
+
+  if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
handle_old_sync flag=%d\n", asyncflag); }
+
+  if (asyncflag & DBDPG_OLDQUERY_CANCEL) {
+       /* Cancel the outstanding query */
+       if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
Cancelling old async command\n"); }
+       if (PQisBusy(imp_dbh->conn)) {
+         PGcancel *cancel;
+         char errbuf[256];
+         int cresult;
+         if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
Attempting to cancel query\n"); }
+         cancel = PQgetCancel(imp_dbh->conn);
+         cresult = PQcancel(cancel,errbuf,255);
+         if (! cresult) {
+               if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg: 
PQcancel failed: %s\n", errbuf); }
+               pg_error(handle, PGRES_FATAL_ERROR, "Could not cancel previous 
command");
+               return -2;
+         }
+         PQfreeCancel(cancel);
+         /* Suck up the cancellation notice */
+         while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
+         }
+         /* We need to rollback! - reprepare!? */
+         PQexec(imp_dbh->conn, "rollback");
+         imp_dbh->done_begin = DBDPG_FALSE;
+       }
+  }
+  else if (asyncflag & DBDPG_OLDQUERY_WAIT || imp_dbh->async_status == -1) {
+       /* Finish up the outstanding query and throw out the result, unless an 
error */
+       if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg: Waiting 
for old async command to finish\n"); }
+       while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
+         status = _sqlstate(imp_dbh, result);
+         PQclear(result);
+         if (status == PGRES_COPY_IN) { /* In theory, this should be caught by 
copystate, but we'll be careful */
+               if (-1 == PQputCopyEnd(imp_dbh->conn, NULL)) {
+                 pg_error(handle, PGRES_FATAL_ERROR, 
PQerrorMessage(imp_dbh->conn));
+                 return -2;
+               }
+         }
+         else if (status == PGRES_COPY_OUT) { /* Won't be as nice with this 
one */
+               pg_error(handle, PGRES_FATAL_ERROR, "Must finish copying 
first");
+               return -2;
+         }
+         else if (status != PGRES_EMPTY_QUERY
+                          && status != PGRES_COMMAND_OK
+                          && status != PGRES_TUPLES_OK) {
+               pg_error(handle, status, PQerrorMessage(imp_dbh->conn));
+               return -2;
+         }
+       }
+  }
+  else {
+       pg_error(handle, PGRES_FATAL_ERROR, "Cannot execute until previous 
async query has finished");
+       return -2;
+  }
+
+  /* If we made it this far, safe to assume there is no running query */
+  imp_dbh->async_status = 0;
+  if (imp_dbh->async_sth)
+       imp_dbh->async_sth->async_status = 0;
+
+  return 0;
+
+} /* end of handle_old_async */
+
+
+
 /*
 Some information to keep you sane:
 typedef enum

Modified: DBD-Pg/trunk/dbdimp.h
==============================================================================
--- DBD-Pg/trunk/dbdimp.h       (original)
+++ DBD-Pg/trunk/dbdimp.h       Thu Jul 12 07:30:57 2007
@@ -33,7 +33,9 @@
        int     copystate;         /* 0=none PGRES_COPY_IN PGRES_COPY_OUT */
        int     pg_errorlevel;     /* PQsetErrorVerbosity. Set by user, 
defaults to 1 */
        int     server_prepare;    /* do we want to use PQexecPrepared? 0=no 
1=yes 2=smart. Can be changed by user */
+       int     async_status;      /* 0=no async 1=async started -1=async has 
been cancelled */
 
+    imp_sth_t *async_sth;      /* current async statement handle */
        AV      *savepoints;       /* list of savepoints */
        PGconn  *conn;             /* connection structure */
        char    *sqlstate;         /* from the last result */
@@ -76,6 +78,8 @@
        int    numbound;         /* how many placeholders were explicitly bound 
by the client, not us */
        int    cur_tuple;        /* current tuple being fetched */
        int    rows;             /* number of affected rows */
+       int    async_flag;       /* async? 0=no 1=async 2=cancel 4=wait */
+       int    async_status;     /* 0=no async 1=async started -1=async has 
been cancelled */
 
        STRLEN totalsize;        /* total string length of the statement (with 
no placeholders)*/
 
@@ -122,7 +126,11 @@
 int pg_db_lo_unlink (SV *dbh, unsigned int lobjId);
 unsigned int pg_db_lo_import (SV *dbh, char *filename);
 int pg_db_lo_export (SV *dbh, unsigned int lobjId, char *filename);
-int pg_quickexec (SV *dbh, const char *sql);
+int pg_quickexec (SV *dbh, const char *sql, int asyncflag);
+int dbdpg_ready (SV *dbh, imp_dbh_t *imp_dbh);
+int dbdpg_result (SV *dbh, imp_dbh_t *imp_dbh);
+int dbdpg_cancel (SV *h, imp_dbh_t *imp_dbh);
+int dbdpg_cancel_sth (SV *sth, imp_sth_t *imp_sth);
 
 /* end of dbdimp.h */
 

Added: DBD-Pg/trunk/t/08async.t
==============================================================================
--- (empty file)
+++ DBD-Pg/trunk/t/08async.t    Thu Jul 12 07:30:57 2007
@@ -0,0 +1,385 @@
+#!perl -w
+
+# Test asynchronous queries
+
+#use Test::More qw/no_plan/;
+use Test::More;
+use Time::HiRes qw/sleep/;
+use DBI;
+use DBD::Pg ':async';
+
+use strict;
+$|=1;
+
+if (defined $ENV{DBI_DSN}) {
+       plan tests => 67;
+}
+else {
+       plan skip_all => 'Cannot run test unless DBI_DSN is defined. See the 
README file';
+}
+
+my $dbh = DBI->connect($ENV{DBI_DSN}, $ENV{DBI_USER}, $ENV{DBI_PASS},
+                                          {RaiseError => 1, PrintError => 0, 
AutoCommit => 0});
+ok( defined $dbh, "Connect to database for async testing");
+
+my ($t,$sth,$count,$res,$expected,@data);
+my $pglibversion = $dbh->{pg_lib_version};
+my $pgversion = $dbh->{pg_server_version};
+my $table = 'dbd_pg_test1';
+
+
+## First, test out do() in all its variants
+
+$t=q{Method do() works as expected with no args };
+eval {
+       $res = $dbh->do("SELECT 123");
+};
+is($@, q{}, $t);
+is($res, 1, $t);
+
+$t=q{Method do() works as expected with an unused attribute };
+eval {
+       $res = $dbh->do("SELECT 123", {pg_nosuch => 'arg'});
+};
+is($@, q{}, $t);
+is($res, 1, $t);
+
+$t=q{Method do() works as expected with an unused attribute and a non-prepared 
param };
+eval {
+       $res = $dbh->do("SET random_page_cost TO ?", undef, '2.2');
+};
+is($@, q{}, $t);
+is($res, '0E0', $t);
+
+$t=q{Method do() works as expected with an unused attribute and multiple real 
bind params };
+eval {
+       $res = $dbh->do("SELECT count(*) FROM pg_class WHERE reltuples IN 
(?,?,?)", undef, 1,2,3);
+};
+is($@, q{}, $t);
+is($res, 1, $t);
+
+$t=q{Cancelling a non-async do() query gives an error };
+eval {
+       $res = $dbh->pg_cancel();
+};
+like($@, qr{No asynchronous query is running}, $t);
+
+$t=q{Method do() works as expected with an asychronous flag };
+eval {
+       $res = $dbh->do("SELECT 123", {pg_async => DBDPG_ASYNC});
+};
+is($@, q{}, $t);
+is($res, '0E0', $t);
+
+$t=q{Database attribute "async_status" returns 1 after async query};
+$res = $dbh->{pg_async_status};
+is($res, +1, $t);
+
+$t=q{Cancelling an async do() query works };
+eval {
+       $res = $dbh->pg_cancel();
+};
+is($@, q{}, $t);
+
+$t=q{Database method pg_cancel returns a false value when cancellation works 
but finished};
+is($res, q{}, $t);
+
+$t=q{Database attribute "async_status" returns -1 after pg_cancel};
+$res = $dbh->{pg_async_status};
+is($res, -1, $t);
+
+$t=q{Running do() after a cancelled query works};
+eval {
+       $res = $dbh->do("SELECT 123");
+};
+is($@, q{}, $t);
+
+$t=q{Database attribute "async_status" returns 0 after normal query run};
+$res = $dbh->{pg_async_status};
+is($res, 0, $t);
+
+$t=q{Method pg_ready() fails after a non-async query};
+eval {
+       $dbh->pg_ready();
+};
+like($@, qr{No async}, $t);
+
+$res = $dbh->do("SELECT 123", {pg_async => DBDPG_ASYNC});
+$t=q{Method pg_ready() works after a non-async query};
+## Sleep a sub-second to make sure the server has caught up
+sleep 0.2;
+eval {
+       $res = $dbh->pg_ready();
+};
+is($@, q{}, $t);
+
+$t=q{Database method pg_ready() returns 1 after a completed async do()};
+is($res, 1, $t);
+
+$res = $dbh->pg_ready();
+$t=q{Database method pg_ready() returns true when called a second time};
+is($res, 1, $t);
+
+$t=q{Database method pg_ready() returns 1 after a completed async do()};
+is($res, 1, $t);
+$t=q{Cancelling an async do() query works };
+eval {
+       $res = $dbh->pg_cancel();
+};
+is($@, q{}, $t);
+$t=q{Database method pg_cancel() returns expected false value for completed 
value};
+is($res, q{}, $t);
+
+$t=q{Method do() runs after pg_cancel has cleared the async query};
+eval {
+       $dbh->do("SELECT 456");
+};
+is($@, q{}, $t);
+
+$dbh->do("SELECT 'async2'", {pg_async => DBDPG_ASYNC});
+
+$t=q{Method do() fails when async query has not been cleared};
+eval {
+       $dbh->do("SELECT 'async_blocks'");
+};
+like($@, qr{previous async}, $t);
+
+$t=q{Database method pg_result works as expected};
+eval {
+       $res = $dbh->pg_result();
+};
+is($@, q{}, $t);
+
+$t=q{Database method pg_result() returns correct value};
+is($res, 1, $t);
+
+$t=q{Database method pg_result() fails when called twice};
+eval {
+       $dbh->pg_result();
+};
+like($@, qr{No async}, $t);
+
+$t=q{Database method pg_cancel() fails when called after pg_result()};
+eval {
+       $dbh->pg_cancel();
+};
+like($@, qr{No async}, $t);
+
+$t=q{Database method pg_ready() fails when called after pg_result()};
+eval {
+       $dbh->pg_ready();
+};
+like($@, qr{No async}, $t);
+
+$t=q{Database method do() works after pg_result()};
+eval {
+       $dbh->do("SELECT 123");
+};
+is($@, q{}, $t);
+
+SKIP: {
+
+
+       if ($pgversion < 80000) {
+               skip "Need pg_sleep() to perform rest of async tests: your 
Postgres is too old", 10;
+       }
+
+       my $time = time();
+       $res = $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+       $time = time()-$time;
+       $t = qq{Database method do() returns right away when in async mode};
+       cmp_ok($time, '<=', 1, $t);
+
+       $t=q{Method pg_ready() returns false when query is still running};
+       $res = $dbh->pg_ready();
+       ok(!$res, $t);
+
+       pass("Sleeping to allow query to finish");
+       sleep(2);
+       $t=q{Method pg_ready() returns true when query is finished};
+       $res = $dbh->pg_ready();
+       ok($res, $t);
+
+       $t=q{Method do() will not work if async query not yet cleared};
+       eval {
+               $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+       };
+       like($@, qr{previous async}, $t);
+
+       $t=q{Database method pg_cancel() works while async query is running};
+       eval {
+               $res = $dbh->pg_cancel();
+       };
+       is($@, q{}, $t);
+       $t=q{Database method pg_cancel returns false when query has already 
finished};
+       ok(!$res,$t);
+
+       $t=q{Database method pg_result() fails after async query has been 
cancelled};
+       eval {
+               $res = $dbh->pg_result();
+       };
+       like($@, qr{No async}, $t);
+
+       $t=q{Database method do() cancels the previous async when requested};
+       eval {
+               $res = $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC 
+ DBDPG_OLDQUERY_CANCEL});
+       };
+       is($@, q{}, $t);
+
+       $t=q{Database method pg_result works when async query is still running};
+       eval {
+               $res = $dbh->pg_result();
+       };
+       is($@, q{}, $t);
+
+       ## Now throw in some execute after the do()
+       $sth = $dbh->prepare("SELECT 567");
+
+       $t = q{Running execute after async do() gives an error};
+       $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+       eval {
+               $res = $sth->execute();
+       };
+       like($@, qr{previous async}, $t);
+
+       $t = q{Running execute after async do() works when told to cancel};
+       $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_OLDQUERY_CANCEL});
+       eval {
+               $sth->execute();
+       };
+       is($@, q{}, $t);
+
+       $t = q{Running execute after async do() works when told to wait};
+       $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+       $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_OLDQUERY_WAIT});
+       eval {
+               $sth->execute();
+       };
+       is($@, q{}, $t);
+
+       $sth->finish();
+
+}; ## end of pg_sleep skip
+
+
+$t=q{Method execute() works when prepare has DBDPG_ASYNC flag};
+$sth = $dbh->prepare("SELECT 123", {pg_async => DBDPG_ASYNC});
+eval {
+       $sth->execute();
+};
+is($@, q{}, $t);
+
+$t=q{Database attribute "async_status" returns 1 after prepare async};
+$res = $dbh->{pg_async_status};
+is($res, 1, $t);
+
+$t=q{Method do() fails when previous async prepare has been executed};
+eval {
+       $dbh->do("SELECT 123");
+};
+like($@, qr{previous async}, $t);
+
+$t=q{Method execute() fails when previous async prepare has been executed};
+eval {
+       $sth->execute();
+};
+like($@, qr{previous async}, $t);
+
+$t=q{Database method pg_cancel works if async query has already finished};
+sleep 0.5;
+eval {
+       $res = $sth->pg_cancel();
+};
+is($@, q{}, $t);
+
+$t=q{Statement method pg_cancel() returns a false value when cancellation 
works but finished};
+is($res, q{}, $t);
+
+$t=q{Method do() fails when previous execute async has not been cleared};
+$sth->execute();
+$sth->finish(); ## Ideally, this would clear out the async, but it cannot at 
the moment
+eval {
+       $dbh->do("SELECT 345");
+};
+like($@, qr{previous async}, $t);
+
+$dbh->pg_cancel;
+
+$t=q{Directly after pg_cancel(), pg_async_status is -1};
+is($dbh->{pg_async_status}, -1, $t);
+
+$t=q{Method execute() works when prepare has DBDPG_ASYNC flag};
+$sth->execute();
+
+$t=q{After async execute, pg_async_status is 1};
+is($dbh->{pg_async_status}, 1, $t);
+
+$t=q{Method pg_result works after a prepare/execute call};
+eval {
+       $res = $dbh->pg_result;
+};
+is($@, q{}, $t);
+
+$t=q{Method pg_result() returns expected result after prepare/execute select};
+is($res, 1, $t);
+
+$t=q{Method fetchall_arrayref works after pg_result};
+eval {
+       $res = $sth->fetchall_arrayref();
+};
+is($@, q{}, $t);
+
+$t=q{Method fetchall_arrayref returns correct result after pg_result};
+is_deeply($res, [[123]], $t);
+
+eval {
+       $dbh->do("DROP TABLE dbdpg_async_test");
+};
+$dbh->do("CREATE TABLE dbdpg_async_test(id INT, t TEXT)");
+$dbh->commit();
+$sth->execute();
+
+$t=q{Method prepare() works when passed in DBDPG_OLDQUERY_CANCEL};
+
+my $sth2;
+my $SQL = "INSERT INTO dbdpg_async_test(id) SELECT 123 UNION SELECT 456";
+eval {
+       $sth2 = $dbh->prepare($SQL, {pg_async => DBDPG_ASYNC + 
DBDPG_OLDQUERY_CANCEL});
+};
+is($@, q{}, $t);
+
+$t=q{Fetch on cancelled statement handle fails};
+eval {
+       $sth->fetch();
+};
+like($@, qr{no statement executing}, $t);
+
+$t=q{Method execute works after async + cancel prepare};
+eval {
+       $sth2->execute();
+};
+is($@, q{}, $t);
+
+$t=q{Statement method pg_result works on async statement handle};
+eval {
+       $res = $sth2->pg_result();
+};
+is($@, q{}, $t);
+
+$t=q{Statement method pg_result returns correct result after execute};
+is($res, 2, $t);
+
+$sth2->execute();
+
+$t=q{Database method pg_result works on async statement handle};
+eval {
+       $res = $sth2->pg_result();
+};
+is($@, q{}, $t);
+$t=q{Database method pg_result returns correct result after execute};
+is($res, 2, $t);
+
+## TODO: More pg_sleep tests with execute
+
+ok( $dbh->disconnect(), 'Disconnect from database');
+

Reply via email to