This causes polling to rapidly double, heading towards... OUCH!
sync_interval_timeout.
If it reaches sync_interval_timeout, we switch states from POLL to
LISTEN, and turn on listening to the node's Event.
And when we get an event, again, we switch back to POLLing mode.
So, we'll generally have two kinds of behaviour:
1. If your application is applying changes very infrequently, then
you'll LISTEN, waking up once in a while when there's an update. Not
many events generated will mean not many dead pg_listener tuples.
2. If your application applies changes very frequently, then you'll
mostly POLL, which will generate exactly 0 dead pg_listener tuples :-) .
Both of those have the similar "not many dead pg_listener tuples" property.
This would doubtless resolve the common desire for "less dead
pg_listener tuples," particularly in combination with a previous patch
that eliminates using LISTEN/NOTIFY for event confirmations.
Modified Files:
--------------
slony1-engine/src/slon:
local_listen.c (r1.35 -> r1.36)
remote_listen.c (r1.27 -> r1.28)
------------------------------------------------------------------------
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.27
retrieving revision 1.28
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.27 -r1.28
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -58,6 +58,10 @@
static int remoteListen_receive_events(SlonNode * node,
SlonConn * conn, struct listat *
listat);
+static int poll_sleep;
+enum pstate_enum {POLL=1, LISTEN};
+static enum pstate_enum pstate;
+
extern char *lag_interval;
/* ----------
@@ -98,8 +102,11 @@
listat_tail = NULL;
dstring_init(&query1);
+ poll_sleep = 0;
+ pstate = POLL; /* Initially, start in Polling mode */
+
sprintf(conn_symname, "node_%d_listen", node->no_id);
-/* sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name); */
+ sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name);
/*
* Work until doomsday
@@ -229,13 +236,23 @@
* register the node connection.
*/
slon_mkquery(&query1,
- "listen \"_%s_Event\"; "
+ /* "listen \"_%s_Event\"; " */
/* skip confirms "listen \"_%s_Confirm\";
" */
- "select %s.registerNodeConnection(%d); ",
- rtcfg_cluster_name, /* rtcfg_cluster_name,
*/
+ "select _%s.registerNodeConnection(%d); ",
+ /* rtcfg_cluster_name, */
rtcfg_namespace, rtcfg_nodeid);
+
+ if (pstate == LISTEN) {
+ slon_appendquery(&query1,
+ "listen \"_%s_Event\"; ",
+ rtcfg_cluster_name);
+ } else {
+ slon_appendquery(&query1,
+ "unlisten \"_%s_Event\"; ",
+ rtcfg_cluster_name);
+ }
res = PQexec(dbconn, dstring_data(&query1));
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR,
"remoteListenThread_%d: \"%s\" - %s",
@@ -299,6 +316,7 @@
/*
* Receive events from the provider node
*/
+ enum pstate_enum oldpstate = pstate;
rc = remoteListen_receive_events(node, conn, listat_head);
if (rc < 0)
{
@@ -313,6 +331,41 @@
continue;
}
+ if (oldpstate != pstate) { /* Switched states... */
+ switch (pstate) {
+ case POLL:
+ slon_log(SLON_DEBUG2,
+ "remoteListenThread_%d: UNLISTEN\n",
+ node->no_id);
+
+ slon_mkquery(&query1,
+ "unlisten \"_%s_Event\"; ",
+ rtcfg_cluster_name);
+ break;
+ case LISTEN:
+ slon_log(SLON_DEBUG2,
+ "remoteListenThread_%d: LISTEN\n",
+ node->no_id);
+ slon_mkquery(&query1,
+ "listen \"_%s_Event\"; ",
+ rtcfg_cluster_name);
+ break;
+ }
+ res = PQexec(dbconn, dstring_data(&query1));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ slon_log(SLON_ERROR,
+ "remoteListenThread_%d: \"%s\" - %s",
+ node->no_id,
+ dstring_data(&query1),
PQresultErrorMessage(res));
+ PQclear(res);
+ slon_disconnectdb(conn);
+ free(conn_conninfo);
+ conn = NULL;
+ conn_conninfo = NULL;
+ continue;
+ }
+ }
/*
* If the remote node notified for new confirmations, read them
and
@@ -343,7 +396,7 @@
/*
* Wait for notification.
*/
- rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 10000);
+ rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep);
if (rc == SCHED_STATUS_CANCEL)
continue;
if (rc != SCHED_STATUS_OK)
@@ -746,6 +799,16 @@
(PQgetisnull(res, tupno, 14)) ? NULL : PQgetvalue(res, tupno,
14));
}
+ if (ntuples > 0) {
+ poll_sleep = 0;
+ pstate = POLL;
+ } else {
+ poll_sleep = poll_sleep * 2 + sync_interval;
+ if (poll_sleep > sync_interval_timeout) {
+ poll_sleep = sync_interval_timeout;
+ pstate = LISTEN;
+ }
+ }
PQclear(res);
return 0;
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.35
retrieving revision 1.36
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.35 -r1.36
--- src/slon/local_listen.c
+++ src/slon/local_listen.c
@@ -49,6 +49,7 @@
PGnotify *notification;
char restart_notify[256];
int restart_request;
+ int poll_sleep = 0;
slon_log(SLON_DEBUG1, "localListenThread: thread starts\n");
@@ -69,9 +70,10 @@
* Listen for local events
*/
slon_mkquery(&query1,
- "listen \"_%s_Event\"; "
+ /* "listen \"_%s_Event\"; " */
"listen \"_%s_Restart\"; ",
- rtcfg_cluster_name, rtcfg_cluster_name);
+ /* rtcfg_cluster_name, */
+ rtcfg_cluster_name);
res = PQexec(dbconn, dstring_data(&query1));
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -636,6 +638,7 @@
*/
if (ntuples > 0)
{
+ poll_sleep = 0; /* drop polling time back to 0... */
res = PQexec(dbconn, "commit transaction");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -654,6 +657,12 @@
/*
* No database events received. Rollback instead.
*/
+
+ /* Increase the amount of time to sleep, to a max of
sync_interval_timeout */
+ poll_sleep += sync_interval;
+ if (poll_sleep > sync_interval_timeout) {
+ poll_sleep = sync_interval_timeout;
+ }
res = PQexec(dbconn, "rollback transaction;");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
@@ -668,9 +677,9 @@
}
/*
- * Wait for notify
+ * Wait for notify or for timeout
*/
- if (sched_wait_conn(conn, SCHED_WAIT_SOCK_READ) !=
SCHED_STATUS_OK)
+ if (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep) !=
SCHED_STATUS_OK)
break;
}
------------------------------------------------------------------------
_______________________________________________
Slony1-commit mailing list
[EMAIL PROTECTED]
http://gborg.postgresql.org/mailman/listinfo/slony1-commit