Author: turnstep
Date: Thu Jul 12 07:30:57 2007
New Revision: 9728
Added:
DBD-Pg/trunk/t/08async.t
Modified:
DBD-Pg/trunk/Pg.h
DBD-Pg/trunk/Pg.pm
DBD-Pg/trunk/Pg.xs
DBD-Pg/trunk/dbdimp.c
DBD-Pg/trunk/dbdimp.h
Log:
First pass at async support: still a little rough, but a good start.
Modified: DBD-Pg/trunk/Pg.h
==============================================================================
--- DBD-Pg/trunk/Pg.h (original)
+++ DBD-Pg/trunk/Pg.h Thu Jul 12 07:30:57 2007
@@ -17,6 +17,9 @@
#define DBDPG_TRUE (bool)1
#define DBDPG_FALSE (bool)0
+#define DBDPG_ASYNC 1
+#define DBDPG_OLDQUERY_CANCEL 2
+#define DBDPG_OLDQUERY_WAIT 4
#include "libpq-fe.h"
Modified: DBD-Pg/trunk/Pg.pm
==============================================================================
--- DBD-Pg/trunk/Pg.pm (original)
+++ DBD-Pg/trunk/Pg.pm Thu Jul 12 07:30:57 2007
@@ -26,6 +26,7 @@
%EXPORT_TAGS =
(
+ async => [qw(DBDPG_ASYNC DBDPG_OLDQUERY_CANCEL
DBDPG_OLDQUERY_WAIT)],
pg_types => [qw(
PG_BOOL PG_BYTEA PG_CHAR PG_INT8 PG_INT2 PG_INT4
PG_TEXT PG_OID PG_TID
PG_FLOAT4 PG_FLOAT8 PG_ABSTIME PG_RELTIME PG_TINTERVAL
PG_BPCHAR
@@ -40,8 +41,8 @@
sub new { my $self = {}; return bless $self, shift; }
}
$DBDPG_DEFAULT = DBD::Pg::DefaultValue->new();
- Exporter::export_ok_tags('pg_types');
- @EXPORT = qw($DBDPG_DEFAULT);
+ Exporter::export_ok_tags('pg_types', 'async');
+ @EXPORT = qw($DBDPG_DEFAULT DBDPG_ASYNC DBDPG_OLDQUERY_CANCEL
DBDPG_OLDQUERY_WAIT);
require_version DBI 1.45;
@@ -78,17 +79,24 @@
});
+ DBD::Pg::db->install_method("pg_cancel");
DBD::Pg::db->install_method("pg_endcopy");
DBD::Pg::db->install_method("pg_getline");
DBD::Pg::db->install_method("pg_ping");
DBD::Pg::db->install_method("pg_putline");
+ DBD::Pg::db->install_method("pg_ready");
DBD::Pg::db->install_method("pg_release");
+ DBD::Pg::db->install_method("pg_result");
DBD::Pg::db->install_method("pg_rollback_to");
DBD::Pg::db->install_method("pg_savepoint");
DBD::Pg::db->install_method("pg_server_trace");
DBD::Pg::db->install_method("pg_server_untrace");
DBD::Pg::db->install_method("pg_type_info");
+ DBD::Pg::st->install_method("pg_cancel");
+ DBD::Pg::st->install_method("pg_result");
+ DBD::Pg::st->install_method("pg_ready");
+
$drh;
}
@@ -3085,6 +3093,643 @@
=back
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other
work while the query is
+running on the backend. Both queries sent by the do() method, and by the
execute() method can be
+sent asynchronously. The basic usage is as follows:
+
+ use DBD::Pg ':async';
+
+ print "Async do() example:\n";
+ $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+ do_something_else();
+ {
+ if ($dbh->pg_ready()) {
+ $res = $pg_result();
+ print "Result of do(): $res\n";
+ }
+ print "Query is still running...\n";
+ if (cancel_request_received) {
+ $dbh->pg_cancel();
+ }
+ sleep 1;
+ redo;
+ }
+
+ print "Async prepare/execute example:\n";
+ $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+ $sth->execute();
+
+ ## Changed our mind, cancel and run again:
+ $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC +
DBDPG_OLDQUERY_CANCEL});
+ $sth->execute();
+
+ do_something_else();
+
+ if (!$sth->pg_ready) {
+ do_another_thing();
+ }
+
+ ## We wait until it is done, and get the result:
+ $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can
import all of them by putting
+either of these at the top of your script:
+
+ use DBD::Pg;
+
+ use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants
is recommended as it
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the
prepare() method as a value
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the
prepare method(), it causes any
+currently running asynchronous query to be cancelled and rolled back. It has
no effect if no asynchronous
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the
prepare method(), it waits for any
+currently running asynchronous query to complete. It has no effect if no
asynchronous query is currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running
asynchronous query. It returns true if
+the cancel suceeded, and false otherwise. Note that a query that has finished
before this method is executed
+will also return false. B<Warning>: a successful cancellation will leave the
database in an unusable state,
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+ $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as
a statement handle method. Both simply
+see if a previously issued asycnhronous query has completed yet. It returns
true if the statement has finished, in which
+case you should then call the pg_result() method. Calls to <pg_ready() should
only be used when you have other
+things to do while the query is running. If you simply want to wait until the
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+ my $time = 0;
+ while (!$dbh->pg_ready) {
+ print "Query is still running. Seconds: $time\n";
+ $time++;
+ sleep 1;
+ }
+ $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued
asynchronous query. If the query is still
+running, this method will wait until it has finished. The result returned is
the number of rows: the same thing
+that would have been returned by the asynchronous do() or execute() if it had
been without an
+asychronous flag.
+
+ $result = $dbh->pg_result;
+
+=back
+
+=head2 Savepoints
+
+PostgreSQL version 8.0 introduced the concept of savepoints, which allows
+transactions to be rolled back to a certain point without affecting the
+rest of the transaction. DBD::Pg encourages using the following methods to
+control savepoints:
+
+=over 4
+
+=item B<pg_savepoint>
+
+Creates a savepoint. This will fail unless you are inside of a transaction.
The
+only argument is the name of the savepoint. Note that PostgreSQL DOES allow
+multiple savepoints with the same name to exist.
+
+ $dbh->pg_savepoint("mysavepoint");
+
+=item B<pg_rollback_to>
+
+Rolls the database back to a named savepoint, discarding any work performed
after
+that point. If more than one savepoint with that name exists, rolls back to
the
+most recently created one.
+
+ $dbh->pg_rollback_to("mysavepoint");
+
+=item B<pg_release>
+
+Releases (or removes) a named savepoint. If more than one savepoint with that
name
+exists, it will only destroy the most recently created one. Note that all
savepoints
+created after the one being released are also destroyed.
+
+ $dbh->pg_release("mysavepoint");
+
+=back
+
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other
work while the query is
+running on the backend. Both queries sent by the do() method, and by the
execute() method can be
+sent asynchronously. The basic usage is as follows:
+
+ use DBD::Pg ':async';
+
+ print "Async do() example:\n";
+ $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+ do_something_else();
+ {
+ if ($dbh->pg_ready()) {
+ $res = $pg_result();
+ print "Result of do(): $res\n";
+ }
+ print "Query is still running...\n";
+ if (cancel_request_received) {
+ $dbh->pg_cancel();
+ }
+ sleep 1;
+ redo;
+ }
+
+ print "Async prepare/execute example:\n";
+ $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+ $sth->execute();
+
+ ## Changed our mind, cancel and run again:
+ $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC +
DBDPG_OLDQUERY_CANCEL});
+ $sth->execute();
+
+ do_something_else();
+
+ if (!$sth->pg_ready) {
+ do_another_thing();
+ }
+
+ ## We wait until it is done, and get the result:
+ $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can
import all of them by putting
+either of these at the top of your script:
+
+ use DBD::Pg;
+
+ use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants
is recommended as it
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the
prepare() method as a value
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the
prepare method(), it causes any
+currently running asynchronous query to be cancelled and rolled back. It has
no effect if no asynchronous
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the
prepare method(), it waits for any
+currently running asynchronous query to complete. It has no effect if no
asynchronous query is currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running
asynchronous query. It returns true if
+the cancel suceeded, and false otherwise. Note that a query that has finished
before this method is executed
+will also return false. B<Warning>: a successful cancellation will leave the
database in an unusable state,
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+ $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as
a statement handle method. Both simply
+see if a previously issued asycnhronous query has completed yet. It returns
true if the statement has finished, in which
+case you should then call the pg_result() method. Calls to <pg_ready() should
only be used when you have other
+things to do while the query is running. If you simply want to wait until the
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+ my $time = 0;
+ while (!$dbh->pg_ready) {
+ print "Query is still running. Seconds: $time\n";
+ $time++;
+ sleep 1;
+ }
+ $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued
asynchronous query. If the query is still
+running, this method will wait until it has finished. The result returned is
the number of rows: the same thing
+that would have been returned by the asynchronous do() or execute() if it had
been without an
+asychronous flag.
+
+ $result = $dbh->pg_result;
+
+=back
+
+=head2 Savepoints
+
+PostgreSQL version 8.0 introduced the concept of savepoints, which allows
+transactions to be rolled back to a certain point without affecting the
+rest of the transaction. DBD::Pg encourages using the following methods to
+control savepoints:
+
+=over 4
+
+=item B<pg_savepoint>
+
+Creates a savepoint. This will fail unless you are inside of a transaction.
The
+only argument is the name of the savepoint. Note that PostgreSQL DOES allow
+multiple savepoints with the same name to exist.
+
+ $dbh->pg_savepoint("mysavepoint");
+
+=item B<pg_rollback_to>
+
+Rolls the database back to a named savepoint, discarding any work performed
after
+that point. If more than one savepoint with that name exists, rolls back to
the
+most recently created one.
+
+ $dbh->pg_rollback_to("mysavepoint");
+
+=item B<pg_release>
+
+Releases (or removes) a named savepoint. If more than one savepoint with that
name
+exists, it will only destroy the most recently created one. Note that all
savepoints
+created after the one being released are also destroyed.
+
+ $dbh->pg_release("mysavepoint");
+
+=back
+
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other
work while the query is
+running on the backend. Both queries sent by the do() method, and by the
execute() method can be
+sent asynchronously. The basic usage is as follows:
+
+ use DBD::Pg ':async';
+
+ print "Async do() example:\n";
+ $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+ do_something_else();
+ {
+ if ($dbh->pg_ready()) {
+ $res = $pg_result();
+ print "Result of do(): $res\n";
+ }
+ print "Query is still running...\n";
+ if (cancel_request_received) {
+ $dbh->pg_cancel();
+ }
+ sleep 1;
+ redo;
+ }
+
+ print "Async prepare/execute example:\n";
+ $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+ $sth->execute();
+
+ ## Changed our mind, cancel and run again:
+ $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC +
DBDPG_OLDQUERY_CANCEL});
+ $sth->execute();
+
+ do_something_else();
+
+ if (!$sth->pg_ready) {
+ do_another_thing();
+ }
+
+ ## We wait until it is done, and get the result:
+ $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can
import all of them by putting
+either of these at the top of your script:
+
+ use DBD::Pg;
+
+ use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants
is recommended as it
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the
prepare() method as a value
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the
prepare method(), it causes any
+currently running asynchronous query to be cancelled and rolled back. It has
no effect if no asynchronous
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the
prepare method(), it waits for any
+currently running asynchronous query to complete. It has no effect if no
asynchronous query is currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running
asynchronous query. It returns true if
+the cancel suceeded, and false otherwise. Note that a query that has finished
before this method is executed
+will also return false. B<Warning>: a successful cancellation will leave the
database in an unusable state,
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+ $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as
a statement handle method. Both simply
+see if a previously issued asycnhronous query has completed yet. It returns
true if the statement has finished, in which
+case you should then call the pg_result() method. Calls to <pg_ready() should
only be used when you have other
+things to do while the query is running. If you simply want to wait until the
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+ my $time = 0;
+ while (!$dbh->pg_ready) {
+ print "Query is still running. Seconds: $time\n";
+ $time++;
+ sleep 1;
+ }
+ $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued
asynchronous query. If the query is still
+running, this method will wait until it has finished. The result returned is
the number of rows: the same thing
+that would have been returned by the asynchronous do() or execute() if it had
been without an
+asychronous flag.
+
+ $result = $dbh->pg_result;
+
+=back
+
+=head2 Savepoints
+
+PostgreSQL version 8.0 introduced the concept of savepoints, which allows
+transactions to be rolled back to a certain point without affecting the
+rest of the transaction. DBD::Pg encourages using the following methods to
+control savepoints:
+
+=over 4
+
+=item B<pg_savepoint>
+
+Creates a savepoint. This will fail unless you are inside of a transaction.
The
+only argument is the name of the savepoint. Note that PostgreSQL DOES allow
+multiple savepoints with the same name to exist.
+
+ $dbh->pg_savepoint("mysavepoint");
+
+=item B<pg_rollback_to>
+
+Rolls the database back to a named savepoint, discarding any work performed
after
+that point. If more than one savepoint with that name exists, rolls back to
the
+most recently created one.
+
+ $dbh->pg_rollback_to("mysavepoint");
+
+=item B<pg_release>
+
+Releases (or removes) a named savepoint. If more than one savepoint with that
name
+exists, it will only destroy the most recently created one. Note that all
savepoints
+created after the one being released are also destroyed.
+
+ $dbh->pg_release("mysavepoint");
+
+=back
+
+=head2 Asynchronous Queries
+
+It is possible to send a query to the backend and have your script do other
work while the query is
+running on the backend. Both queries sent by the do() method, and by the
execute() method can be
+sent asynchronously. The basic usage is as follows:
+
+ use DBD::Pg ':async';
+
+ print "Async do() example:\n";
+ $dbh->do("SELECT long_running_query()", {pg_async => DBDPG_ASYNC});
+ do_something_else();
+ {
+ if ($dbh->pg_ready()) {
+ $res = $pg_result();
+ print "Result of do(): $res\n";
+ }
+ print "Query is still running...\n";
+ if (cancel_request_received) {
+ $dbh->pg_cancel();
+ }
+ sleep 1;
+ redo;
+ }
+
+ print "Async prepare/execute example:\n";
+ $sth = $dbh->prepare("SELECT long_running_query(1)", {pg_async => ASYNC});
+ $sth->execute();
+
+ ## Changed our mind, cancel and run again:
+ $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_ASYNC +
DBDPG_OLDQUERY_CANCEL});
+ $sth->execute();
+
+ do_something_else();
+
+ if (!$sth->pg_ready) {
+ do_another_thing();
+ }
+
+ ## We wait until it is done, and get the result:
+ $res = $dbh->pg_result();
+
+=head3 Asynchronous Constants
+
+There are currently three asynchronous constants exported by DBD::Pg. You can
import all of them by putting
+either of these at the top of your script:
+
+ use DBD::Pg;
+
+ use DBD::Pg ':async';
+
+You may also use the numbers instead of the constants, but using the constants
is recommended as it
+makes your script more readable.
+
+=over 4
+
+=item DBDPG_ASYNC
+
+This is a constant for the number 1. It is passed to either the do() or the
prepare() method as a value
+to the pg_async key and indicates that the query should be sent asynchronously.
+
+=item DBDPG_OLDQUERY_CANCEL
+
+This is a constant for the number 2. When passed to either the do() or the
prepare method(), it causes any
+currently running asynchronous query to be cancelled and rolled back. It has
no effect if no asynchronous
+query is currently running.
+
+=item DBDPG_OLDQUERY_WAIT
+
+This is a constant for the number 4. When passed to either the do() or the
prepare method(), it waits for any
+currently running asynchronous query to complete. It has no effect if there is
no asynchronous query currently running.
+
+=back
+
+=head3 Asynchronous Methods
+
+=over 4
+
+=item pg_cancel
+
+This database-level method attempts to cancel any currently running
asynchronous query. It returns true if
+the cancel suceeded, and false otherwise. Note that a query that has finished
before this method is executed
+will also return false. B<WARNING>: a successful cancellation will leave the
database in an unusable state,
+so DBD::Pg will automatically clear out the error message and issue a ROLLBACK.
+
+ $result = $dbh->pg_cancel();
+
+=item pg_ready
+
+This method can be called as a database handle method or (for convenience) as
a statement handle method. Both simply
+see if a previously issued asycnhronous query has completed yet. It returns
true if the statement has finished, in which
+case you should then call the pg_result() method. Calls to pg_ready() should
only be used when you have other
+things to do while the query is running. If you simply want to wait until the
query is done, do not call pg_ready()
+over and over, but simply call the pg_result() method.
+
+ my $time = 0;
+ while (!$dbh->pg_ready) {
+ print "Query is still running. Seconds: $time\n";
+ $time++;
+ sleep 1;
+ }
+ $result = $dbh->pg_result;
+
+=item pg_result
+
+This database handle method returns the results of a previously issued
asynchronous query. If the query is still
+running, this method will wait until it has finished. The result returned is
the number of rows: the same thing
+that would have been returned by the asynchronous do() or execute() if it had
been called without an asychronous flag.
+
+ $result = $dbh->pg_result;
+
+=back
+
+=head3 Asynchronous Examples
+
+Here are some working examples of asynchronous queries. Note that we'll use
the pg_sleep function to emulate a
+long-running query.
+
+ use strict;
+ use warnings;
+ use Time::HiRes 'sleep';
+ use DBD::Pg ':async';
+
+ my $dbh = DBI->connect('dbi:Pg:dbname=postgres', 'postgres', '',
{AutoCommit=>0,RaiseError=>1});
+
+ ## Kick off a long running query on the first database:
+ my $sth = $dbh->prepare("SELECT pg_sleep(?)", {pg_async => DBDPG_ASYNC});
+ $sth->execute(5);
+
+ ## While that is running, do some other things
+ print "Your query is processing. Thanks for waiting\n";
+ check_on_the_kids(); ## Expensive sub, takes at least three seconds.
+
+ while (!$dbh->pg_ready) {
+ check_on_the_kids();
+ ## If the above function returns quickly for some reason, we add a small
sleep
+ sleep 0.1;
+ }
+
+ print "The query has finished. Gathering results\n";
+ my $result = $sth->pg_result;
+ print "Result: $result\n";
+ my $info = $sth->fetchall_arrayref();
+
+Without asynchronous queries, the above script would take about 8 seconds to
run: five seconds waiting
+for the execute to finish, then three for the check_on_the_kids() function to
return. With asynchronous
+queries, the script takes about 6 seconds to run, and gets in two iterations
of check_on_the_kids in
+the process.
+
+Here's an example showing the ability to cancel a long-running query. Imagine
two slave databases in
+different geographic locations over a slow network. You need information as
quickly as possible, so
+you query both at once. When you get an answer, you tell the other one to stop
working on your query,
+as you don't need it anymore.
+
+ use strict;
+ use warnings;
+ use Time::HiRes 'sleep';
+ use DBD::Pg ':async';
+
+ my $dbhslave1 = DBI->connect('dbi:Pg:dbname=postgres;host=slave1',
'postgres', '', {AutoCommit=>0,RaiseError=>1});
+ my $dbhslave2 = DBI->connect('dbi:Pg:dbname=postgres;host=slave2',
'postgres', '', {AutoCommit=>0,RaiseError=>1});
+
+ $SQL = "SELECT count(*) FROM largetable WHERE flavor='blueberry'";
+
+ my $sth1 = $dbhslave1->prepare($SQL, {pg_async => DBDPG_ASYNC});
+ my $sth2 = $dbhslave2->prepare($SQL, {pg_async => DBDPG_ASYNC});
+
+ $sth1->execute();
+ $sth2->execute();
+
+ my $winner;
+ while (!defined $winner) {
+ if ($sth1->pg_ready) {
+ $winner = 1;
+ }
+ elsif ($sth2->pg_ready) {
+ $winner = 2;
+ }
+ Time::HiRes::sleep 0.05;
+ }
+
+ my $count;
+ if ($winner == 1) {
+ $sth2->pg_cancel();
+ $sth1->pg_result();
+ $count = $sth1->fetchall_arrayref()->[0][0];
+ }
+ else {
+ $sth1->pg_cancel();
+ $sth2->pg_result();
+ $count = $sth2->fetchall_arrayref()->[0][0];
+ }
+
+
=head2 COPY support
DBD::Pg supports the COPY command through three functions: pg_putline,
Modified: DBD-Pg/trunk/Pg.xs
==============================================================================
--- DBD-Pg/trunk/Pg.xs (original)
+++ DBD-Pg/trunk/Pg.xs Thu Jul 12 07:30:57 2007
@@ -61,6 +61,10 @@
PG_OID = 26
PG_TID = 27
+ DBDPG_ASYNC = DBDPG_ASYNC
+ DBDPG_OLDQUERY_CANCEL = DBDPG_OLDQUERY_CANCEL
+ DBDPG_OLDQUERY_WAIT = DBDPG_OLDQUERY_WAIT
+
CODE:
if (0==ix) {
if (!name) {
@@ -171,27 +175,34 @@
CODE:
{
int retval;
+ int asyncflag = 0;
+ SV **svp;
if (strlen(statement)<1) { /* Corner case */
XST_mUNDEF(0);
return;
}
- if (items < 3) { /* No attribs, no arguments */
+ if (attr && SvROK(attr) && SvTYPE(SvRV(attr)) == SVt_PVHV) {
+ if ((svp = hv_fetch((HV*)SvRV(attr),"pg_async", 8, 0))
!= NULL) {
+ asyncflag = SvIV(*svp);
+ }
+ }
+ if (items < 4) { /* No bind arguments */
/* Quick run via PQexec */
- retval = pg_quickexec(dbh, statement);
+ retval = pg_quickexec(dbh, statement, asyncflag);
}
- else { /* The normal, slower way */
+ else { /* We've got bind arguments, so we do the whole
prepare/execute route */
imp_sth_t *imp_sth;
SV * sth = dbixst_bounce_method("prepare", 3);
if (!SvROK(sth))
XSRETURN_UNDEF;
imp_sth = (imp_sth_t*)(DBIh_COM(sth));
- if (items > 3)
- if (!dbdxst_bind_params(sth, imp_sth, items-2,
ax+2))
- XSRETURN_UNDEF;
+ if (!dbdxst_bind_params(sth, imp_sth, items-2, ax+2))
+ XSRETURN_UNDEF;
imp_sth->server_prepare = 1;
imp_sth->onetime = 1; /* Overrides the above at actual
PQexec* decision time */
+ imp_sth->async_flag = asyncflag;
retval = dbd_st_execute(sth, imp_sth);
}
@@ -467,6 +478,34 @@
ST(0) = sv_2mortal( newSViv( type_num ) );
}
+void
+pg_result(dbh)
+ SV * dbh
+ CODE:
+ int ret;
+ D_imp_dbh(dbh);
+ ret = dbdpg_result(dbh, imp_dbh);
+ if (ret == 0)
+ XST_mPV(0, "0E0");
+ else if (ret < -1)
+ XST_mUNDEF(0);
+ else
+ XST_mIV(0, ret);
+
+void
+pg_ready(dbh)
+ SV *dbh
+ CODE:
+ D_imp_dbh(dbh);
+ ST(0) = sv_2mortal(newSViv(dbdpg_ready(dbh, imp_dbh)));
+
+void
+pg_cancel(dbh)
+ SV *dbh
+ CODE:
+ D_imp_dbh(dbh);
+ ST(0) = dbdpg_cancel(dbh, imp_dbh) ? &sv_yes : &sv_no;
+
# -- end of DBD::Pg::db
@@ -482,4 +521,35 @@
D_imp_dbh_from_sth;
ST(0) = strEQ(imp_dbh->sqlstate,"00000") ? &sv_no :
newSVpv(imp_dbh->sqlstate, 5);
+void
+pg_ready(sth)
+ SV *sth
+ CODE:
+ D_imp_sth(sth);
+ D_imp_dbh_from_sth;
+ ST(0) = sv_2mortal(newSViv(dbdpg_ready(sth, imp_dbh)));
+
+void
+pg_cancel(sth)
+ SV *sth
+ CODE:
+ D_imp_sth(sth);
+ ST(0) = dbdpg_cancel_sth(sth, imp_sth) ? &sv_yes : &sv_no;
+
+void
+pg_result(sth)
+ SV * sth
+ CODE:
+ int ret;
+ D_imp_sth(sth);
+ D_imp_dbh_from_sth;
+ ret = dbdpg_result(sth, imp_dbh);
+ if (ret == 0)
+ XST_mPV(0, "0E0");
+ else if (ret < -1)
+ XST_mUNDEF(0);
+ else
+ XST_mIV(0, ret);
+
+
# end of Pg.xs
Modified: DBD-Pg/trunk/dbdimp.c
==============================================================================
--- DBD-Pg/trunk/dbdimp.c (original)
+++ DBD-Pg/trunk/dbdimp.c Thu Jul 12 07:30:57 2007
@@ -68,6 +68,7 @@
static int dbd_st_deallocate_statement(SV *sth, imp_sth_t *imp_sth);
static PGTransactionStatusType dbd_db_txn_status (imp_dbh_t *imp_dbh);
static int pg_db_start_txn (SV *dbh, imp_dbh_t *imp_dbh);
+static int handle_old_async(SV * handle, imp_dbh_t * imp_dbh, int asyncflag);
DBISTATE_DECLARE;
@@ -221,6 +222,8 @@
imp_dbh->prepare_number = 1;
imp_dbh->copystate = 0;
imp_dbh->pg_errorlevel = 1; /* Default */
+ imp_dbh->async_status = 0;
+ imp_dbh->async_sth = NULL;
/* If the server can handle it, we default to "smart", otherwise "off"
*/
imp_dbh->server_prepare = imp_dbh->pg_protocol >= 3 ?
@@ -330,8 +333,7 @@
we are connecting over TCP/IP, only set it here if non-null, and fall
through
to a better default value below.
*/
- if (result
- && NULL != PQresultErrorField(result,PG_DIAG_SQLSTATE)) {
+ if (result && NULL != PQresultErrorField(result,PG_DIAG_SQLSTATE)) {
strncpy(imp_dbh->sqlstate,
PQresultErrorField(result,PG_DIAG_SQLSTATE), 5);
imp_dbh->sqlstate[5] = '\0';
stateset = DBDPG_TRUE;
@@ -534,11 +536,18 @@
/* ================================================================== */
void dbd_db_destroy (SV * dbh, imp_dbh_t * imp_dbh)
{
- if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
dbd_db_destroy\n"); }
+ if (dbis->debug >= 4)
+ (void)PerlIO_printf(DBILOGFP, "dbdpg: dbd_db_destroy\n");
if (DBIc_ACTIVE(imp_dbh))
(void)dbd_db_disconnect(dbh, imp_dbh);
+ if (imp_dbh->async_sth) { /* Just in case */
+ if (imp_dbh->async_sth->result)
+ PQclear(imp_dbh->async_sth->result);
+ imp_dbh->async_sth = NULL;
+ }
+
av_undef(imp_dbh->savepoints);
sv_free((SV *)imp_dbh->savepoints);
Safefree(imp_dbh->sqlstate);
@@ -635,10 +644,12 @@
#endif
break;
- case 15: /* pg_default_port */
+ case 15: /* pg_default_port pg_async_status */
if (strEQ("pg_default_port", key))
retsv = newSViv((IV) PGDEFPORT );
+ else if (strEQ("pg_async_status", key))
+ retsv = newSViv((IV)imp_dbh->async_status);
break;
case 17: /* pg_server_prepare pg_server_version */
@@ -925,9 +936,11 @@
}
break;
- case 8: /* NULLABLE */
+ case 8: /* pg_async NULLABLE */
- if (strEQ("NULLABLE", key)) {
+ if (strEQ("pg_async", key))
+ retsv = newSViv((IV)imp_sth->async_flag);
+ else if (strEQ("NULLABLE", key)) {
AV *av = newAV();
PGresult *result;
int status = -1;
@@ -1038,6 +1051,13 @@
switch (kl) {
+ case 8: /* pg_async */
+
+ if (strEQ("pg_async", key)) {
+ imp_sth->async_flag = SvIV(valuesv);
+ return 1;
+ }
+
case 14: /* pg_prepare_now */
if (strEQ("pg_prepare_now", key)) {
@@ -1094,7 +1114,7 @@
} /* end of dbd_discon_all */
-// XXX IS this really needed die to $dbh->{pg_socket}?
+/* Deprecated in favor of $dbh->{pg_socket} */
/* ================================================================== */
int dbd_db_getfd (SV * dbh, imp_dbh_t * imp_dbh)
{
@@ -1160,6 +1180,8 @@
imp_sth->cur_tuple = 0;
imp_sth->rows = -1; /* per DBI spec */
imp_sth->totalsize = 0;
+ imp_sth->async_flag = 0;
+ imp_sth->async_status = 0;
imp_sth->prepare_name = NULL;
imp_sth->firstword = NULL;
imp_sth->result = NULL;
@@ -1199,6 +1221,9 @@
if ((svp =
hv_fetch((HV*)SvRV(attribs),"pg_placeholder_dollaronly", 25, 0)) != NULL) {
imp_sth->dollaronly = SvTRUE(*svp) ? DBDPG_TRUE :
DBDPG_FALSE;
}
+ if ((svp = hv_fetch((HV*)SvRV(attribs),"pg_async", 8, 0)) !=
NULL) {
+ imp_sth->async_flag = SvIV(*svp);
+ }
}
/* Figure out the first word in the statement */
@@ -1869,9 +1894,10 @@
}
result = PQprepare(imp_dbh->conn, imp_sth->prepare_name,
statement, params, paramTypes);
Safefree(paramTypes);
- if (result)
+ if (result) {
status = PQresultStatus(result);
- PQclear(result);
+ PQclear(result);
+ }
if (dbis->debug >= 6)
(void)PerlIO_printf(DBILOGFP, "dbdpg: Using PQprepare:
%s\n", statement);
}
@@ -2093,7 +2119,7 @@
/* ================================================================== */
-int pg_quickexec (SV * dbh, const char * sql)
+int pg_quickexec (SV * dbh, const char * sql, int asyncflag)
{
D_imp_dbh(dbh);
PGresult * result;
@@ -2102,7 +2128,8 @@
int rows = 0;
if (dbis->debug >= 4)
- (void)PerlIO_printf(DBILOGFP, "dbdpg: pg_quickexec (%s)\n",
sql);
+ (void)PerlIO_printf(DBILOGFP, "dbdpg: dbdpg_quickexec
query=(%s) async=(%d) async_status=(%d)\n",
+ sql, asyncflag,
imp_dbh->async_status);
if (NULL == imp_dbh->conn)
croak("execute on disconnected handle");
@@ -2111,6 +2138,14 @@
if (imp_dbh->copystate!=0)
croak("Must call pg_endcopy before issuing more commands");
+ /* If we are still waiting on an async, handle it */
+ if (imp_dbh->async_status) {
+ if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg: handling
old async\n");
+ rows = handle_old_async(dbh, imp_dbh, asyncflag);
+ if (rows)
+ return rows;
+ }
+
/* If not autocommit, start a new transaction */
if (!imp_dbh->done_begin && !DBIc_has(imp_dbh, DBIcf_AutoCommit)) {
status = _result(imp_dbh, "begin");
@@ -2121,6 +2156,20 @@
imp_dbh->done_begin = DBDPG_TRUE;
}
+ /* Asynchronous commands get kicked off and return undef */
+ if (asyncflag & DBDPG_ASYNC) {
+ if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg: Going
asychronous with do()\n");
+ if (! PQsendQuery(imp_dbh->conn, sql)) {
+ if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg:
PQsendQuery failed\n");
+ pg_error(dbh, status, PQerrorMessage(imp_dbh->conn));
+ return -2;
+ }
+ imp_dbh->async_status = 1;
+ imp_dbh->async_sth = NULL; // Needed?
+ if (dbis->debug >= 4) (void)PerlIO_printf(DBILOGFP, "dbdpg:
PQsendQuery worked\n");
+ return 0;
+ }
+
result = PQexec(imp_dbh->conn, sql);
status = _sqlstate(imp_dbh, result);
@@ -2205,6 +2254,17 @@
}
}
+ /* Check for old async transactions */
+ if (imp_dbh->async_status) {
+ if (dbis->debug >= 7) {
+ (void)PerlIO_printf
+ (DBILOGFP, "dbdpg: Attempting to handle existing async
transaction\n");
+ }
+ ret = handle_old_async(sth, imp_dbh, imp_sth->async_flag);
+ if (ret)
+ return ret;
+ }
+
/* If not autocommit, start a new transaction */
if (!imp_dbh->done_begin && !DBIc_has(imp_dbh, DBIcf_AutoCommit)) {
status = _result(imp_dbh, "begin");
@@ -2216,8 +2276,10 @@
}
/* clear old result (if any) */
- if (imp_sth->result != NULL)
+ if (imp_sth->result) {
PQclear(imp_sth->result);
+ imp_sth->result = NULL;
+ }
/*
Now, we need to build the statement to send to the backend
@@ -2348,7 +2410,11 @@
if (dbis->debug >= 5)
(void)PerlIO_printf(DBILOGFP, "dbdpg: Running
PQexecPrepared with (%s)\n", imp_sth->prepare_name);
- imp_sth->result = PQexecPrepared
+ if (imp_sth->async_flag & DBDPG_ASYNC)
+ ret = PQsendQueryPrepared
+ (imp_dbh->conn, imp_sth->prepare_name, imp_sth->numphs,
paramValues, paramLengths, paramFormats, 0);
+ else
+ imp_sth->result = PQexecPrepared
(imp_dbh->conn, imp_sth->prepare_name, imp_sth->numphs,
paramValues, paramLengths, paramFormats, 0);
} /* end new-style prepare */
@@ -2412,7 +2478,11 @@
if (dbis->debug >= 5)
(void)PerlIO_printf(DBILOGFP, "dbdpg: Running
PQexecParams with (%s)\n", statement);
- imp_sth->result = PQexecParams
+ if (imp_sth->async_flag & DBDPG_ASYNC)
+ ret = PQsendQueryParams
+ (imp_dbh->conn, statement, imp_sth->numphs,
paramTypes, paramValues, paramLengths, paramFormats, 0);
+ else
+ imp_sth->result = PQexecParams
(imp_dbh->conn, statement, imp_sth->numphs,
paramTypes, paramValues, paramLengths, paramFormats, 0);
Safefree(paramTypes);
}
@@ -2440,9 +2510,13 @@
statement[execsize] = '\0';
if (dbis->debug >= 5)
- (void)PerlIO_printf(DBILOGFP, "dbdpg: Running
PQexec with (%s)\n", statement);
+ (void)PerlIO_printf(DBILOGFP, "dbdpg: Running
%s with (%s)\n",
+
imp_sth->async_flag & 1 ? "PQsendQuery" : "PQexec", statement);
- imp_sth->result = PQexec(imp_dbh->conn, statement);
+ if (imp_sth->async_flag & DBDPG_ASYNC)
+ ret = PQsendQuery(imp_dbh->conn, statement);
+ else
+ imp_sth->result = PQexec(imp_dbh->conn, statement);
} /* end PQexec */
@@ -2450,14 +2524,25 @@
} /* end non-prepared exec */
- /* Some form of PQexec has been run at this point */
-
- status = _sqlstate(imp_dbh, imp_sth->result);
+ /* Some form of PQexec/PQsendQuery has been run at this point */
Safefree(paramValues);
Safefree(paramLengths);
Safefree(paramFormats);
+ /* If running asynchronously, we don't stick around for the result */
+ if (imp_sth->async_flag & DBDPG_ASYNC) {
+ if (dbis->debug >= 2)
+ (void)PerlIO_printf
+ (DBILOGFP, "dbdpg: Early return for async query");
+ imp_dbh->async_status = 1;
+ imp_sth->async_status = 1;
+ imp_dbh->async_sth = imp_sth;
+ return 0;
+ }
+
+ status = _sqlstate(imp_dbh, imp_sth->result);
+
imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise
*/
if (PGRES_TUPLES_OK == status) {
num_fields = PQnfields(imp_sth->result);
@@ -2641,8 +2726,11 @@
int dbd_st_finish (SV * sth, imp_sth_t * imp_sth)
{
+ D_imp_dbh_from_sth;
+
if (dbis->debug >= 4)
- (void)PerlIO_printf(DBILOGFP, "dbdpg: dbd_st_finish sth=%d\n",
sth);
+ (void)PerlIO_printf(DBILOGFP, "dbdpg: dbdpg_finish sth=%d
async=%d\n",
+ sth,
imp_dbh->async_status);
if (DBIc_ACTIVE(imp_sth) && imp_sth->result) {
PQclear(imp_sth->result);
@@ -2650,6 +2738,16 @@
imp_sth->rows = 0;
}
+ /* Are we in the middle of an async for this statement handle? */
+ if (imp_dbh->async_status) {
+ if (imp_sth->async_status) {
+ handle_old_async(sth, imp_dbh, DBDPG_OLDQUERY_WAIT);
+ }
+ }
+
+ imp_sth->async_status = 0;
+ imp_dbh->async_sth = NULL;
+
DBIc_ACTIVE_off(imp_sth);
return 1;
@@ -2759,6 +2857,10 @@
return;
}
+ if (imp_dbh->async_status) {
+ handle_old_async(sth, imp_dbh, DBDPG_OLDQUERY_WAIT);
+ }
+
/* Deallocate only if we named this statement ourselves and we still
have a good connection */
/* On rare occasions, dbd_db_destroy is called first and we can no
longer rely on imp_dbh */
if (imp_sth->prepared_by_us && DBIc_ACTIVE(imp_dbh)) {
@@ -2772,7 +2874,7 @@
Safefree(imp_sth->type_info);
Safefree(imp_sth->firstword);
- if (NULL != imp_sth->result) {
+ if (imp_sth->result) {
PQclear(imp_sth->result);
imp_sth->result = NULL;
}
@@ -2801,6 +2903,9 @@
}
imp_sth->ph = NULL;
+ if (imp_dbh->async_sth)
+ imp_dbh->async_sth = NULL;
+
DBIc_IMPSET_off(imp_sth); /* let DBI know we've done it */
} /* end of dbd_st_destroy */
@@ -3296,6 +3401,295 @@
} /* end of dbd_st_blob_read */
+
+
+/* ================================================================== */
+/* Return the result of an asynchronous query, waiting if needed */
+int dbdpg_result (h, imp_dbh)
+ SV *h;
+ imp_dbh_t *imp_dbh;
+{
+
+ PGresult *result;
+ ExecStatusType status = PGRES_FATAL_ERROR;
+ int rows;
+ char *cmdStatus = NULL;
+
+ if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
dbdpg_result\n"); }
+
+ if (1 != imp_dbh->async_status) {
+ pg_error(h, PGRES_FATAL_ERROR, "No asynchronous query is
running\n");
+ return -2;
+ }
+
+ imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise
*/
+
+ while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
+ /* TODO: Better multiple result-set handling */
+ status = _sqlstate(imp_dbh, result);
+ switch (status) {
+ case PGRES_TUPLES_OK:
+ rows = PQntuples(result);
+
+ if (imp_dbh->async_sth) {
+ imp_dbh->async_sth->cur_tuple = 0;
+ DBIc_NUM_FIELDS(imp_dbh->async_sth) = PQnfields(result);
+ DBIc_ACTIVE_on(imp_dbh->async_sth);
+ }
+
+ break;
+ case PGRES_COMMAND_OK:
+ /* non-select statement */
+ cmdStatus = PQcmdStatus(result);
+ if ((0==strncmp(cmdStatus, "DELETE", 6)) ||
(0==strncmp(cmdStatus, "INSERT", 6)) ||
+ (0==strncmp(cmdStatus, "UPDATE", 6))) {
+ rows = atoi(PQcmdTuples(result));
+ }
+ break;
+ case PGRES_COPY_OUT:
+ case PGRES_COPY_IN:
+ /* Copy Out/In data transfer in progress */
+ imp_dbh->copystate = status;
+ rows = -1;
+ break;
+ case PGRES_EMPTY_QUERY:
+ case PGRES_BAD_RESPONSE:
+ case PGRES_NONFATAL_ERROR:
+ rows = -2;
+ pg_error(h, status, PQerrorMessage(imp_dbh->conn));
+ break;
+ case PGRES_FATAL_ERROR:
+ default:
+ rows = -2;
+ pg_error(h, status, PQerrorMessage(imp_dbh->conn));
+ break;
+ }
+
+ if (imp_dbh->async_sth) {
+ if (imp_dbh->async_sth->result) /* For potential multi-result
sets */
+ PQclear(imp_dbh->async_sth->result);
+ imp_dbh->async_sth->result = result;
+ }
+ else {
+ PQclear(result);
+ }
+ }
+
+ if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
dbdpg_result returning %d\n", rows); }
+ if (imp_dbh->async_sth) {
+ imp_dbh->async_sth->rows = rows;
+ imp_dbh->async_sth->async_status = 0;
+ }
+ imp_dbh->async_status = 0;
+ return rows;
+
+} /* end of dbdpg_result */
+
+
+/*
+==================================================================
+Indicates if an asynchronous query has finished yet
+Accepts either a database or a statement handle
+Returns:
+ -1 if no query is running (and raises an exception)
+ +1 if the query is finished
+ 0 if the query is still running
+ -2 for other errors
+==================================================================
+*/
+
+int dbdpg_ready (h, imp_dbh)
+ SV *h;
+ imp_dbh_t *imp_dbh;
+{
+
+ if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
pg_st_ready\n"); }
+
+ if (0 == imp_dbh->async_status) {
+ pg_error(h, PGRES_FATAL_ERROR, "No asynchronous query is
running\n");
+ return -1;
+ }
+
+ if (!PQconsumeInput(imp_dbh->conn)) {
+ pg_error(h, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn));
+ return -2;
+ }
+
+ return ! PQisBusy(imp_dbh->conn);
+
+} /* end of dbdpg_ready */
+
+
+/*
+Attempt to cancel a running asynchronous query
+Returns true if the cancel succeeded, and false if it did not
+If it did successfully cancel the query, it will also do a rollback.
+Note that queries which have finished do not cause a rollback.
+In this case, pg_cancel will return false.
+NOTE: We only return true if we cancelled and rolled back!
+*/
+
+int dbdpg_cancel(h, imp_dbh)
+ SV *h;
+ imp_dbh_t *imp_dbh;
+{
+
+ PGcancel *cancel;
+ char errbuf[256];
+ PGresult *result;
+ ExecStatusType status;
+
+ if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
dbdpg_cancel, async=%d\n", imp_dbh->async_status); }
+
+ if (0 == imp_dbh->async_status) {
+ pg_error(h, PGRES_FATAL_ERROR, "No asynchronous query is running");
+ return DBDPG_FALSE;
+ }
+
+ if (-1 == imp_dbh->async_status) {
+ pg_error(h, PGRES_FATAL_ERROR, "Asychronous query has already been
cancelled");
+ return DBDPG_FALSE;
+ }
+
+ /* Get the cancel structure */
+ cancel = PQgetCancel(imp_dbh->conn);
+
+ /* This almost always works. If not, free our structure and complain
looudly */
+ if (! PQcancel(cancel,errbuf,sizeof(errbuf))) {
+ PQfreeCancel(cancel);
+ if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
PQcancel failed: %s\n", errbuf); }
+ pg_error(h, PGRES_FATAL_ERROR, "PQcancel failed");
+ return DBDPG_FALSE;
+ }
+ PQfreeCancel(cancel);
+
+ /* Whatever else happens, we should no longer be inside of an async
query */
+ imp_dbh->async_status = -1;
+ if (imp_dbh->async_sth)
+ imp_dbh->async_sth->async_status = -1;
+
+ /* Read in the result - assume only one */
+ result = PQgetResult(imp_dbh->conn);
+ if (!result) {
+ pg_error(h, PGRES_FATAL_ERROR, "Failed to get a result after
PQcancel");
+ return DBDPG_FALSE;
+ }
+
+ status = _sqlstate(imp_dbh, result);
+
+ /* If we actually cancelled a running query, perform a rollback */
+ if (0 == strncmp(imp_dbh->sqlstate, "57014", 5)) {
+ if (dbis->debug >= 0) { (void)PerlIO_printf(DBILOGFP, "dbdpg: Rolling
back after cancelled query\n"); }
+ dbd_db_rollback(h, imp_dbh);
+ // PQexec(imp_dbh->conn, "ROLLBACK");
+ return DBDPG_TRUE;
+ }
+
+ /* If we got any other error, make sure we report it */
+ if (0 != strncmp(imp_dbh->sqlstate, "00000", 5)) {
+ if (dbis->debug >= 0) { (void)PerlIO_printf(DBILOGFP, "dbdpg: Query
was not cancelled: was already finished\n"); }
+ pg_error(h, status, PQerrorMessage(imp_dbh->conn));
+ }
+
+ return DBDPG_FALSE;
+
+} /* end of dbdpg_cancel */
+
+
+int dbdpg_cancel_sth(sth, imp_sth)
+ SV *sth;
+ imp_sth_t *imp_sth;
+{
+
+ D_imp_dbh_from_sth;
+ bool cancel_result;
+
+ cancel_result = dbdpg_cancel(sth, imp_dbh);
+
+ dbd_st_finish(sth, imp_sth);
+
+ return cancel_result;
+
+} /* end of dbdpg_cancel */
+
+
+/*
+Finish up an existing async query, either by cancelling it,
+or by waiting for a result.
+
+ */
+static int handle_old_async(SV * handle, imp_dbh_t * imp_dbh, int asyncflag)
+{
+
+ PGresult *result;
+ ExecStatusType status;
+
+ if (dbis->debug >= 4) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
handle_old_sync flag=%d\n", asyncflag); }
+
+ if (asyncflag & DBDPG_OLDQUERY_CANCEL) {
+ /* Cancel the outstanding query */
+ if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
Cancelling old async command\n"); }
+ if (PQisBusy(imp_dbh->conn)) {
+ PGcancel *cancel;
+ char errbuf[256];
+ int cresult;
+ if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
Attempting to cancel query\n"); }
+ cancel = PQgetCancel(imp_dbh->conn);
+ cresult = PQcancel(cancel,errbuf,255);
+ if (! cresult) {
+ if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg:
PQcancel failed: %s\n", errbuf); }
+ pg_error(handle, PGRES_FATAL_ERROR, "Could not cancel previous
command");
+ return -2;
+ }
+ PQfreeCancel(cancel);
+ /* Suck up the cancellation notice */
+ while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
+ }
+ /* We need to rollback! - reprepare!? */
+ PQexec(imp_dbh->conn, "rollback");
+ imp_dbh->done_begin = DBDPG_FALSE;
+ }
+ }
+ else if (asyncflag & DBDPG_OLDQUERY_WAIT || imp_dbh->async_status == -1) {
+ /* Finish up the outstanding query and throw out the result, unless an
error */
+ if (dbis->debug >= 1) { (void)PerlIO_printf(DBILOGFP, "dbdpg: Waiting
for old async command to finish\n"); }
+ while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
+ status = _sqlstate(imp_dbh, result);
+ PQclear(result);
+ if (status == PGRES_COPY_IN) { /* In theory, this should be caught by
copystate, but we'll be careful */
+ if (-1 == PQputCopyEnd(imp_dbh->conn, NULL)) {
+ pg_error(handle, PGRES_FATAL_ERROR,
PQerrorMessage(imp_dbh->conn));
+ return -2;
+ }
+ }
+ else if (status == PGRES_COPY_OUT) { /* Won't be as nice with this
one */
+ pg_error(handle, PGRES_FATAL_ERROR, "Must finish copying
first");
+ return -2;
+ }
+ else if (status != PGRES_EMPTY_QUERY
+ && status != PGRES_COMMAND_OK
+ && status != PGRES_TUPLES_OK) {
+ pg_error(handle, status, PQerrorMessage(imp_dbh->conn));
+ return -2;
+ }
+ }
+ }
+ else {
+ pg_error(handle, PGRES_FATAL_ERROR, "Cannot execute until previous
async query has finished");
+ return -2;
+ }
+
+ /* If we made it this far, safe to assume there is no running query */
+ imp_dbh->async_status = 0;
+ if (imp_dbh->async_sth)
+ imp_dbh->async_sth->async_status = 0;
+
+ return 0;
+
+} /* end of handle_old_async */
+
+
+
/*
Some information to keep you sane:
typedef enum
Modified: DBD-Pg/trunk/dbdimp.h
==============================================================================
--- DBD-Pg/trunk/dbdimp.h (original)
+++ DBD-Pg/trunk/dbdimp.h Thu Jul 12 07:30:57 2007
@@ -33,7 +33,9 @@
int copystate; /* 0=none PGRES_COPY_IN PGRES_COPY_OUT */
int pg_errorlevel; /* PQsetErrorVerbosity. Set by user,
defaults to 1 */
int server_prepare; /* do we want to use PQexecPrepared? 0=no
1=yes 2=smart. Can be changed by user */
+ int async_status; /* 0=no async 1=async started -1=async has
been cancelled */
+ imp_sth_t *async_sth; /* current async statement handle */
AV *savepoints; /* list of savepoints */
PGconn *conn; /* connection structure */
char *sqlstate; /* from the last result */
@@ -76,6 +78,8 @@
int numbound; /* how many placeholders were explicitly bound
by the client, not us */
int cur_tuple; /* current tuple being fetched */
int rows; /* number of affected rows */
+ int async_flag; /* async? 0=no 1=async 2=cancel 4=wait */
+ int async_status; /* 0=no async 1=async started -1=async has
been cancelled */
STRLEN totalsize; /* total string length of the statement (with
no placeholders)*/
@@ -122,7 +126,11 @@
int pg_db_lo_unlink (SV *dbh, unsigned int lobjId);
unsigned int pg_db_lo_import (SV *dbh, char *filename);
int pg_db_lo_export (SV *dbh, unsigned int lobjId, char *filename);
-int pg_quickexec (SV *dbh, const char *sql);
+int pg_quickexec (SV *dbh, const char *sql, int asyncflag);
+int dbdpg_ready (SV *dbh, imp_dbh_t *imp_dbh);
+int dbdpg_result (SV *dbh, imp_dbh_t *imp_dbh);
+int dbdpg_cancel (SV *h, imp_dbh_t *imp_dbh);
+int dbdpg_cancel_sth (SV *sth, imp_sth_t *imp_sth);
/* end of dbdimp.h */
Added: DBD-Pg/trunk/t/08async.t
==============================================================================
--- (empty file)
+++ DBD-Pg/trunk/t/08async.t Thu Jul 12 07:30:57 2007
@@ -0,0 +1,385 @@
+#!perl -w
+
+# Test asynchronous queries
+
+#use Test::More qw/no_plan/;
+use Test::More;
+use Time::HiRes qw/sleep/;
+use DBI;
+use DBD::Pg ':async';
+
+use strict;
+$|=1;
+
+if (defined $ENV{DBI_DSN}) {
+ plan tests => 67;
+}
+else {
+ plan skip_all => 'Cannot run test unless DBI_DSN is defined. See the
README file';
+}
+
+my $dbh = DBI->connect($ENV{DBI_DSN}, $ENV{DBI_USER}, $ENV{DBI_PASS},
+ {RaiseError => 1, PrintError => 0,
AutoCommit => 0});
+ok( defined $dbh, "Connect to database for async testing");
+
+my ($t,$sth,$count,$res,$expected,@data);
+my $pglibversion = $dbh->{pg_lib_version};
+my $pgversion = $dbh->{pg_server_version};
+my $table = 'dbd_pg_test1';
+
+
+## First, test out do() in all its variants
+
+$t=q{Method do() works as expected with no args };
+eval {
+ $res = $dbh->do("SELECT 123");
+};
+is($@, q{}, $t);
+is($res, 1, $t);
+
+$t=q{Method do() works as expected with an unused attribute };
+eval {
+ $res = $dbh->do("SELECT 123", {pg_nosuch => 'arg'});
+};
+is($@, q{}, $t);
+is($res, 1, $t);
+
+$t=q{Method do() works as expected with an unused attribute and a non-prepared
param };
+eval {
+ $res = $dbh->do("SET random_page_cost TO ?", undef, '2.2');
+};
+is($@, q{}, $t);
+is($res, '0E0', $t);
+
+$t=q{Method do() works as expected with an unused attribute and multiple real
bind params };
+eval {
+ $res = $dbh->do("SELECT count(*) FROM pg_class WHERE reltuples IN
(?,?,?)", undef, 1,2,3);
+};
+is($@, q{}, $t);
+is($res, 1, $t);
+
+$t=q{Cancelling a non-async do() query gives an error };
+eval {
+ $res = $dbh->pg_cancel();
+};
+like($@, qr{No asynchronous query is running}, $t);
+
+$t=q{Method do() works as expected with an asychronous flag };
+eval {
+ $res = $dbh->do("SELECT 123", {pg_async => DBDPG_ASYNC});
+};
+is($@, q{}, $t);
+is($res, '0E0', $t);
+
+$t=q{Database attribute "async_status" returns 1 after async query};
+$res = $dbh->{pg_async_status};
+is($res, +1, $t);
+
+$t=q{Cancelling an async do() query works };
+eval {
+ $res = $dbh->pg_cancel();
+};
+is($@, q{}, $t);
+
+$t=q{Database method pg_cancel returns a false value when cancellation works
but finished};
+is($res, q{}, $t);
+
+$t=q{Database attribute "async_status" returns -1 after pg_cancel};
+$res = $dbh->{pg_async_status};
+is($res, -1, $t);
+
+$t=q{Running do() after a cancelled query works};
+eval {
+ $res = $dbh->do("SELECT 123");
+};
+is($@, q{}, $t);
+
+$t=q{Database attribute "async_status" returns 0 after normal query run};
+$res = $dbh->{pg_async_status};
+is($res, 0, $t);
+
+$t=q{Method pg_ready() fails after a non-async query};
+eval {
+ $dbh->pg_ready();
+};
+like($@, qr{No async}, $t);
+
+$res = $dbh->do("SELECT 123", {pg_async => DBDPG_ASYNC});
+$t=q{Method pg_ready() works after a non-async query};
+## Sleep a sub-second to make sure the server has caught up
+sleep 0.2;
+eval {
+ $res = $dbh->pg_ready();
+};
+is($@, q{}, $t);
+
+$t=q{Database method pg_ready() returns 1 after a completed async do()};
+is($res, 1, $t);
+
+$res = $dbh->pg_ready();
+$t=q{Database method pg_ready() returns true when called a second time};
+is($res, 1, $t);
+
+$t=q{Database method pg_ready() returns 1 after a completed async do()};
+is($res, 1, $t);
+$t=q{Cancelling an async do() query works };
+eval {
+ $res = $dbh->pg_cancel();
+};
+is($@, q{}, $t);
+$t=q{Database method pg_cancel() returns expected false value for completed
value};
+is($res, q{}, $t);
+
+$t=q{Method do() runs after pg_cancel has cleared the async query};
+eval {
+ $dbh->do("SELECT 456");
+};
+is($@, q{}, $t);
+
+$dbh->do("SELECT 'async2'", {pg_async => DBDPG_ASYNC});
+
+$t=q{Method do() fails when async query has not been cleared};
+eval {
+ $dbh->do("SELECT 'async_blocks'");
+};
+like($@, qr{previous async}, $t);
+
+$t=q{Database method pg_result works as expected};
+eval {
+ $res = $dbh->pg_result();
+};
+is($@, q{}, $t);
+
+$t=q{Database method pg_result() returns correct value};
+is($res, 1, $t);
+
+$t=q{Database method pg_result() fails when called twice};
+eval {
+ $dbh->pg_result();
+};
+like($@, qr{No async}, $t);
+
+$t=q{Database method pg_cancel() fails when called after pg_result()};
+eval {
+ $dbh->pg_cancel();
+};
+like($@, qr{No async}, $t);
+
+$t=q{Database method pg_ready() fails when called after pg_result()};
+eval {
+ $dbh->pg_ready();
+};
+like($@, qr{No async}, $t);
+
+$t=q{Database method do() works after pg_result()};
+eval {
+ $dbh->do("SELECT 123");
+};
+is($@, q{}, $t);
+
+SKIP: {
+
+
+ if ($pgversion < 80000) {
+ skip "Need pg_sleep() to perform rest of async tests: your
Postgres is too old", 10;
+ }
+
+ my $time = time();
+ $res = $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+ $time = time()-$time;
+ $t = qq{Database method do() returns right away when in async mode};
+ cmp_ok($time, '<=', 1, $t);
+
+ $t=q{Method pg_ready() returns false when query is still running};
+ $res = $dbh->pg_ready();
+ ok(!$res, $t);
+
+ pass("Sleeping to allow query to finish");
+ sleep(2);
+ $t=q{Method pg_ready() returns true when query is finished};
+ $res = $dbh->pg_ready();
+ ok($res, $t);
+
+ $t=q{Method do() will not work if async query not yet cleared};
+ eval {
+ $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+ };
+ like($@, qr{previous async}, $t);
+
+ $t=q{Database method pg_cancel() works while async query is running};
+ eval {
+ $res = $dbh->pg_cancel();
+ };
+ is($@, q{}, $t);
+ $t=q{Database method pg_cancel returns false when query has already
finished};
+ ok(!$res,$t);
+
+ $t=q{Database method pg_result() fails after async query has been
cancelled};
+ eval {
+ $res = $dbh->pg_result();
+ };
+ like($@, qr{No async}, $t);
+
+ $t=q{Database method do() cancels the previous async when requested};
+ eval {
+ $res = $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC
+ DBDPG_OLDQUERY_CANCEL});
+ };
+ is($@, q{}, $t);
+
+ $t=q{Database method pg_result works when async query is still running};
+ eval {
+ $res = $dbh->pg_result();
+ };
+ is($@, q{}, $t);
+
+ ## Now throw in some execute after the do()
+ $sth = $dbh->prepare("SELECT 567");
+
+ $t = q{Running execute after async do() gives an error};
+ $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+ eval {
+ $res = $sth->execute();
+ };
+ like($@, qr{previous async}, $t);
+
+ $t = q{Running execute after async do() works when told to cancel};
+ $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_OLDQUERY_CANCEL});
+ eval {
+ $sth->execute();
+ };
+ is($@, q{}, $t);
+
+ $t = q{Running execute after async do() works when told to wait};
+ $dbh->do("SELECT pg_sleep(2)", {pg_async => DBDPG_ASYNC});
+ $sth = $dbh->prepare("SELECT 678", {pg_async => DBDPG_OLDQUERY_WAIT});
+ eval {
+ $sth->execute();
+ };
+ is($@, q{}, $t);
+
+ $sth->finish();
+
+}; ## end of pg_sleep skip
+
+
+$t=q{Method execute() works when prepare has DBDPG_ASYNC flag};
+$sth = $dbh->prepare("SELECT 123", {pg_async => DBDPG_ASYNC});
+eval {
+ $sth->execute();
+};
+is($@, q{}, $t);
+
+$t=q{Database attribute "async_status" returns 1 after prepare async};
+$res = $dbh->{pg_async_status};
+is($res, 1, $t);
+
+$t=q{Method do() fails when previous async prepare has been executed};
+eval {
+ $dbh->do("SELECT 123");
+};
+like($@, qr{previous async}, $t);
+
+$t=q{Method execute() fails when previous async prepare has been executed};
+eval {
+ $sth->execute();
+};
+like($@, qr{previous async}, $t);
+
+$t=q{Database method pg_cancel works if async query has already finished};
+sleep 0.5;
+eval {
+ $res = $sth->pg_cancel();
+};
+is($@, q{}, $t);
+
+$t=q{Statement method pg_cancel() returns a false value when cancellation
works but finished};
+is($res, q{}, $t);
+
+$t=q{Method do() fails when previous execute async has not been cleared};
+$sth->execute();
+$sth->finish(); ## Ideally, this would clear out the async, but it cannot at
the moment
+eval {
+ $dbh->do("SELECT 345");
+};
+like($@, qr{previous async}, $t);
+
+$dbh->pg_cancel;
+
+$t=q{Directly after pg_cancel(), pg_async_status is -1};
+is($dbh->{pg_async_status}, -1, $t);
+
+$t=q{Method execute() works when prepare has DBDPG_ASYNC flag};
+$sth->execute();
+
+$t=q{After async execute, pg_async_status is 1};
+is($dbh->{pg_async_status}, 1, $t);
+
+$t=q{Method pg_result works after a prepare/execute call};
+eval {
+ $res = $dbh->pg_result;
+};
+is($@, q{}, $t);
+
+$t=q{Method pg_result() returns expected result after prepare/execute select};
+is($res, 1, $t);
+
+$t=q{Method fetchall_arrayref works after pg_result};
+eval {
+ $res = $sth->fetchall_arrayref();
+};
+is($@, q{}, $t);
+
+$t=q{Method fetchall_arrayref returns correct result after pg_result};
+is_deeply($res, [[123]], $t);
+
+eval {
+ $dbh->do("DROP TABLE dbdpg_async_test");
+};
+$dbh->do("CREATE TABLE dbdpg_async_test(id INT, t TEXT)");
+$dbh->commit();
+$sth->execute();
+
+$t=q{Method prepare() works when passed in DBDPG_OLDQUERY_CANCEL};
+
+my $sth2;
+my $SQL = "INSERT INTO dbdpg_async_test(id) SELECT 123 UNION SELECT 456";
+eval {
+ $sth2 = $dbh->prepare($SQL, {pg_async => DBDPG_ASYNC +
DBDPG_OLDQUERY_CANCEL});
+};
+is($@, q{}, $t);
+
+$t=q{Fetch on cancelled statement handle fails};
+eval {
+ $sth->fetch();
+};
+like($@, qr{no statement executing}, $t);
+
+$t=q{Method execute works after async + cancel prepare};
+eval {
+ $sth2->execute();
+};
+is($@, q{}, $t);
+
+$t=q{Statement method pg_result works on async statement handle};
+eval {
+ $res = $sth2->pg_result();
+};
+is($@, q{}, $t);
+
+$t=q{Statement method pg_result returns correct result after execute};
+is($res, 2, $t);
+
+$sth2->execute();
+
+$t=q{Database method pg_result works on async statement handle};
+eval {
+ $res = $sth2->pg_result();
+};
+is($@, q{}, $t);
+$t=q{Database method pg_result returns correct result after execute};
+is($res, 2, $t);
+
+## TODO: More pg_sleep tests with execute
+
+ok( $dbh->disconnect(), 'Disconnect from database');
+