On Tue, Oct 29, 2013 at 11:26 AM, Nigel Heron <nhe...@querymetrics.com> wrote:
>>
>> So, for now, the counters only track sockets created from an inbound
>> (client to server) connection.
>
> here's v3 of the patch (rebase and cleanup).
>

Hi,
here's v4 of the patch. I added documentation and a new global view
called "pg_stat_socket" (includes bytes_sent, bytes_received and
stats_reset time)

thanks,
-nigel.
*** a/doc/src/sgml/monitoring.sgml
--- b/doc/src/sgml/monitoring.sgml
***************
*** 278,283 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
--- 278,291 ----
       </row>
  
       <row>
+       <entry><structname>pg_stat_socket</><indexterm><primary>pg_stat_socket</primary></indexterm></entry>
+       <entry>One row only, showing statistics about the
+        cluster's communication socket activity. See
+        <xref linkend="pg-stat-socket-view"> for details.
+      </entry>
+      </row>
+ 
+      <row>
        <entry><structname>pg_stat_database</><indexterm><primary>pg_stat_database</primary></indexterm></entry>
        <entry>One row per database, showing database-wide statistics. See
         <xref linkend="pg-stat-database-view"> for details.
***************
*** 627,632 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
--- 635,650 ----
        that was executed.
       </entry>
      </row>
+     <row>
+      <entry><structfield>bytes_sent</></entry>
+      <entry><type>bigint</></entry>
+      <entry>Number of bytes sent over this backend's communication socket</entry>
+     </row>
+     <row>
+      <entry><structfield>bytes_received</></entry>
+      <entry><type>bigint</></entry>
+      <entry>Number of bytes received over this backend's communication socket</entry>
+     </row>
     </tbody>
     </tgroup>
    </table>
***************
*** 735,740 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
--- 753,801 ----
     single row, containing global data for the cluster.
    </para>
  
+   <table id="pg-stat-socket-view" xreflabel="pg_stat_socket">
+    <title><structname>pg_stat_socket</structname> View</title>
+ 
+    <tgroup cols="3">
+     <thead>
+     <row>
+       <entry>Column</entry>
+       <entry>Type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+ 
+     <tbody>
+      <row>
+       <entry><structfield>bytes_sent</></entry>
+       <entry><type>bigint</type></entry>
+       <entry>
+         Number of bytes sent over communication sockets.
+       </entry>
+      </row>
+      <row>
+       <entry><structfield>bytes_received</></entry>
+       <entry><type>bigint</type></entry>
+       <entry>
+         Number of bytes received over communication sockets.
+       </entry>
+      </row>
+      <row>
+       <entry><structfield>stats_reset</></entry>
+       <entry><type>timestamp with time zone</type></entry>
+       <entry>Time at which these statistics were last reset</entry>
+      </row>
+     </tbody>
+     </tgroup>
+   </table>
+ 
+   <para>
+    The <structname>pg_stat_socket</structname> view will always have a
+    single row, containing global data for the cluster.
+    Only sockets created from inbound client connections are tracked (Unix sockets and TCP).
+    Streaming replication traffic is counted on the master, but not on the slave (See <xref linkend="pg-stat-replication-view"> for details.)
+   </para>
+ 
    <table id="pg-stat-database-view" xreflabel="pg_stat_database">
     <title><structname>pg_stat_database</structname> View</title>
     <tgroup cols="3">
***************
*** 859,864 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
--- 920,935 ----
        in milliseconds</entry>
      </row>
      <row>
+      <entry><structfield>bytes_sent</></entry>
+      <entry><type>bigint</></entry>
+      <entry>Number of bytes sent over backend communication sockets in this database</entry>
+     </row>
+     <row>
+      <entry><structfield>bytes_received</></entry>
+      <entry><type>bigint</></entry>
+      <entry>Number of bytes received over this backend communication sockets in this database</entry>
+     </row>
+     <row>
       <entry><structfield>stats_reset</></entry>
       <entry><type>timestamp with time zone</></entry>
       <entry>Time at which these statistics were last reset</entry>
***************
*** 1417,1422 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
--- 1488,1503 ----
       </entry>
      </row>
      <row>
+      <entry><structfield>bytes_sent</></entry>
+      <entry><type>bigint</></entry>
+      <entry>Number of bytes sent over this WAL sender's communication socket</entry>
+     </row>
+     <row>
+      <entry><structfield>bytes_received</></entry>
+      <entry><type>bigint</></entry>
+      <entry>Number of bytes received over this WAL sender's communication socket</entry>
+     </row>
+     <row>
       <entry><structfield>state</></entry>
       <entry><type>text</></entry>
       <entry>Current WAL sender state</entry>
***************
*** 1613,1618 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
--- 1694,1701 ----
         argument (requires superuser privileges).
         Calling <literal>pg_stat_reset_shared('bgwriter')</> will zero all the
         counters shown in the <structname>pg_stat_bgwriter</> view.
+        Calling <literal>pg_stat_reset_shared('socket')</> will zero all the
+        counters shown in the <structname>pg_stat_socket</> view.
        </entry>
       </row>
  
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 586,592 **** CREATE VIEW pg_stat_activity AS
              S.state_change,
              S.waiting,
              S.state,
!             S.query
      FROM pg_database D, pg_stat_get_activity(NULL) AS S, pg_authid U
      WHERE S.datid = D.oid AND
              S.usesysid = U.oid;
--- 586,594 ----
              S.state_change,
              S.waiting,
              S.state,
!             S.query,
!             S.bytes_sent,
!             S.bytes_received
      FROM pg_database D, pg_stat_get_activity(NULL) AS S, pg_authid U
      WHERE S.datid = D.oid AND
              S.usesysid = U.oid;
***************
*** 601,606 **** CREATE VIEW pg_stat_replication AS
--- 603,610 ----
              S.client_hostname,
              S.client_port,
              S.backend_start,
+             S.bytes_sent,
+             S.bytes_received,
              W.state,
              W.sent_location,
              W.write_location,
***************
*** 634,639 **** CREATE VIEW pg_stat_database AS
--- 638,645 ----
              pg_stat_get_db_deadlocks(D.oid) AS deadlocks,
              pg_stat_get_db_blk_read_time(D.oid) AS blk_read_time,
              pg_stat_get_db_blk_write_time(D.oid) AS blk_write_time,
+             pg_stat_get_db_bytes_sent(D.oid) AS bytes_sent,
+             pg_stat_get_db_bytes_received(D.oid) AS bytes_received,
              pg_stat_get_db_stat_reset_time(D.oid) AS stats_reset
      FROM pg_database D;
  
***************
*** 686,691 **** CREATE VIEW pg_stat_bgwriter AS
--- 692,703 ----
          pg_stat_get_buf_alloc() AS buffers_alloc,
          pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
  
+ CREATE VIEW pg_stat_socket AS
+     SELECT
+         pg_stat_get_bytes_sent() AS bytes_sent,
+         pg_stat_get_bytes_received() AS bytes_received,
+         pg_stat_get_socket_stat_reset_time() AS stats_reset;
+ 
  CREATE VIEW pg_user_mappings AS
      SELECT
          U.oid       AS umid,
*** a/src/backend/libpq/be-secure.c
--- b/src/backend/libpq/be-secure.c
***************
*** 74,80 ****
  #include "libpq/libpq.h"
  #include "tcop/tcopprot.h"
  #include "utils/memutils.h"
! 
  
  #ifdef USE_SSL
  
--- 74,80 ----
  #include "libpq/libpq.h"
  #include "tcop/tcopprot.h"
  #include "utils/memutils.h"
! #include "pgstat.h"
  
  #ifdef USE_SSL
  
***************
*** 307,312 **** rloop:
--- 307,318 ----
  		n = recv(port->sock, ptr, len, 0);
  
  		client_read_ended();
+ 
+ 		if (n > 0)
+ 		{
+ 			/* we received data from the socket that needs to be reported */
+ 			pgstat_report_commreceived(n);
+ 		}
  	}
  
  	return n;
***************
*** 441,447 **** wloop:
--- 447,460 ----
  	}
  	else
  #endif
+ 	{
  		n = send(port->sock, ptr, len, 0);
+ 		if (n > 0)
+ 		{
+ 			/* we sent data over the socket that needs to be reported */
+ 			pgstat_report_commsent(n);
+ 		}
+ 	}
  
  	return n;
  }
***************
*** 488,493 **** my_sock_read(BIO *h, char *buf, int size)
--- 501,512 ----
  
  	client_read_ended();
  
+ 	if (res > 0)
+ 	{
+ 		/* we received data from the socket that needs to be reported */
+ 		pgstat_report_commreceived(res);
+ 	}
+ 
  	return res;
  }
  
***************
*** 504,509 **** my_sock_write(BIO *h, const char *buf, int size)
--- 523,533 ----
  			BIO_set_retry_write(h);
  		}
  	}
+ 	else
+ 	{
+ 		/* we sent data over the socket that needs to be reported */
+ 		pgstat_report_commsent(res);
+ 	}
  
  	return res;
  }
*** a/src/backend/postmaster/pgstat.c
--- b/src/backend/postmaster/pgstat.c
***************
*** 298,303 **** static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
--- 298,305 ----
  static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
  static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
  static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+ static void pgstat_recv_commsent(PgStat_MsgComm *msg, int len);
+ static void pgstat_recv_commreceived(PgStat_MsgComm *msg, int len);
  
  /* ------------------------------------------------------------
   * Public functions called from postmaster follow
***************
*** 1249,1259 **** pgstat_reset_shared_counters(const char *target)
  
  	if (strcmp(target, "bgwriter") == 0)
  		msg.m_resettarget = RESET_BGWRITER;
  	else
  		ereport(ERROR,
  				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
  				 errmsg("unrecognized reset target: \"%s\"", target),
! 				 errhint("Target must be \"bgwriter\".")));
  
  	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
  	pgstat_send(&msg, sizeof(msg));
--- 1251,1263 ----
  
  	if (strcmp(target, "bgwriter") == 0)
  		msg.m_resettarget = RESET_BGWRITER;
+ 	else if (strcmp(target, "socket") == 0)
+ 		msg.m_resettarget = RESET_SOCKET;
  	else
  		ereport(ERROR,
  				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
  				 errmsg("unrecognized reset target: \"%s\"", target),
! 				 errhint("Target must be \"bgwriter\" or \"socket\".")));
  
  	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
  	pgstat_send(&msg, sizeof(msg));
***************
*** 2531,2536 **** pgstat_bestart(void)
--- 2535,2542 ----
  	beentry->st_clienthostname[NAMEDATALEN - 1] = '\0';
  	beentry->st_appname[NAMEDATALEN - 1] = '\0';
  	beentry->st_activity[pgstat_track_activity_query_size - 1] = '\0';
+ 	beentry->st_bytes_sent = 0;
+ 	beentry->st_bytes_received = 0;
  
  	beentry->st_changecount++;
  	Assert((beentry->st_changecount & 1) == 0);
***************
*** 2738,2743 **** pgstat_report_waiting(bool waiting)
--- 2744,2811 ----
  	beentry->st_waiting = waiting;
  }
  
+ /* --------
+  * pgstat_report_commsent() -
+  *
+  *    Tell the collector about data sent over a socket.
+  *    It is the caller's responsibility not invoke with a negative len
+  * --------
+  */
+ void
+ pgstat_report_commsent(int count)
+ {
+ 	volatile PgBackendStatus *beentry = MyBEEntry;
+ 	PgStat_MsgComm msg;
+ 
+ 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+ 		return;
+ 
+ 	/* this function can be called by the postmaster */
+ 	if (beentry != NULL) {
+ 		beentry->st_changecount++;
+ 		beentry->st_bytes_sent += count;
+ 		beentry->st_changecount++;
+ 		Assert((beentry->st_changecount & 1) == 0);
+ 	}
+ 
+ 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_COMMSENT);
+ 	/* MyDatabaseId might be invalid, we'll check it in the msg receiver */
+ 	msg.m_databaseid = MyDatabaseId;
+ 	msg.m_bytes_transferred = count;
+ 	pgstat_send(&msg, sizeof(msg));
+ }
+ 
+ /* --------
+  * pgstat_report_commreceived() -
+  *
+  *    Tell the collector about data received from a socket.
+  *    It is the caller's responsibility not invoke with a negative len
+  * --------
+  */
+ void
+ pgstat_report_commreceived(int count)
+ {
+ 	volatile PgBackendStatus *beentry = MyBEEntry;
+ 	PgStat_MsgComm msg;
+ 
+ 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+ 		return;
+ 
+ 	/* this function can be called by the postmaster */
+ 	if (beentry != NULL) {
+ 		beentry->st_changecount++;
+ 		beentry->st_bytes_received += count;
+ 		beentry->st_changecount++;
+ 		Assert((beentry->st_changecount & 1) == 0);
+ 	}
+ 
+ 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_COMMRECEIVED);
+ 	/* MyDatabaseId might be invalid, we'll check it in the msg receiver */
+ 	msg.m_databaseid = MyDatabaseId;
+ 	msg.m_bytes_transferred = count;
+ 	pgstat_send(&msg, sizeof(msg));
+ }
+ 
  
  /* ----------
   * pgstat_read_current_status() -
***************
*** 3290,3295 **** PgstatCollectorMain(int argc, char *argv[])
--- 3358,3371 ----
  					pgstat_recv_tempfile((PgStat_MsgTempFile *) &msg, len);
  					break;
  
+ 				case PGSTAT_MTYPE_COMMSENT:
+ 					pgstat_recv_commsent((PgStat_MsgComm *) &msg, len);
+ 					break;
+ 
+ 				case PGSTAT_MTYPE_COMMRECEIVED:
+ 					pgstat_recv_commreceived((PgStat_MsgComm *) &msg, len);
+ 					break;
+ 
  				default:
  					break;
  			}
***************
*** 3390,3395 **** reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
--- 3466,3473 ----
  	dbentry->n_deadlocks = 0;
  	dbentry->n_block_read_time = 0;
  	dbentry->n_block_write_time = 0;
+ 	dbentry->n_bytes_sent = 0;
+ 	dbentry->n_bytes_received = 0;
  
  	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
  	dbentry->stats_timestamp = 0;
***************
*** 3798,3803 **** pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
--- 3876,3882 ----
  	int32		format_id;
  	bool		found;
  	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
+ 	TimestampTz now;
  
  	/*
  	 * The tables will live in pgStatLocalContext.
***************
*** 3825,3831 **** pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
  	 * Set the current timestamp (will be kept only in case we can't load an
  	 * existing statsfile).
  	 */
! 	globalStats.stat_reset_timestamp = GetCurrentTimestamp();
  
  	/*
  	 * Try to open the stats file. If it doesn't exist, the backends simply
--- 3904,3912 ----
  	 * Set the current timestamp (will be kept only in case we can't load an
  	 * existing statsfile).
  	 */
! 	now = GetCurrentTimestamp();
! 	globalStats.bgwriter_stat_reset_timestamp = now;
! 	globalStats.socket_stat_reset_timestamp = now;
  
  	/*
  	 * Try to open the stats file. If it doesn't exist, the backends simply
***************
*** 4722,4730 **** pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
  {
  	if (msg->m_resettarget == RESET_BGWRITER)
  	{
  		/* Reset the global background writer statistics for the cluster. */
! 		memset(&globalStats, 0, sizeof(globalStats));
! 		globalStats.stat_reset_timestamp = GetCurrentTimestamp();
  	}
  
  	/*
--- 4803,4829 ----
  {
  	if (msg->m_resettarget == RESET_BGWRITER)
  	{
+ 		globalStats.stats_timestamp = 0;
  		/* Reset the global background writer statistics for the cluster. */
! 		globalStats.timed_checkpoints = 0;
! 		globalStats.requested_checkpoints = 0;
! 		globalStats.checkpoint_write_time = 0;
! 		globalStats.checkpoint_sync_time = 0;
! 		globalStats.buf_written_checkpoints = 0;
! 		globalStats.buf_written_clean = 0;
! 		globalStats.maxwritten_clean = 0;
! 		globalStats.buf_written_backend = 0;
! 		globalStats.buf_fsync_backend = 0;
! 		globalStats.buf_alloc = 0;
! 		globalStats.bgwriter_stat_reset_timestamp = GetCurrentTimestamp();
! 	}
! 	else if (msg->m_resettarget == RESET_SOCKET)
! 	{
! 		globalStats.stats_timestamp = 0;
! 		/* Reset the global socket transfer statistics for the cluster. */
! 		globalStats.bytes_sent = 0;
! 		globalStats.bytes_received = 0;
! 		globalStats.socket_stat_reset_timestamp = GetCurrentTimestamp();
  	}
  
  	/*
***************
*** 4951,4956 **** pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
--- 5050,5095 ----
  }
  
  /* ----------
+  * pgstat_recv_commsent() -
+  *
+  *    Process a COMMSENT message.
+  * ----------
+  */
+ static void
+ pgstat_recv_commsent(PgStat_MsgComm *msg, int len)
+ {
+ 	PgStat_StatDBEntry *dbentry;
+ 
+ 	globalStats.bytes_sent += msg->m_bytes_transferred;
+ 
+ 	/* can be called before we have connected to a specific database or by walsender */
+ 	if (OidIsValid(msg->m_databaseid)) {
+ 		dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+ 		dbentry->n_bytes_sent += msg->m_bytes_transferred;
+ 	}
+ }
+ 
+ /* ----------
+  * pgstat_recv_commreceived() -
+  *
+  *    Process a COMMRECEIVED message.
+  * ----------
+  */
+ static void
+ pgstat_recv_commreceived(PgStat_MsgComm *msg, int len)
+ {
+ 	PgStat_StatDBEntry *dbentry;
+ 
+ 	globalStats.bytes_received += msg->m_bytes_transferred;
+ 
+ 	/* can be called before we have connected to a specific database or by walsender */
+ 	if (OidIsValid(msg->m_databaseid)) {
+ 		dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+ 		dbentry->n_bytes_received += msg->m_bytes_transferred;
+ 	}
+ }
+ 
+ /* ----------
   * pgstat_recv_funcstat() -
   *
   *	Count what the backend has done.
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 1825,1830 **** retry1:
--- 1825,1832 ----
  					 errmsg("failed to send SSL negotiation response: %m")));
  			return STATUS_ERROR;	/* close the connection */
  		}
+ 		else
+ 			pgstat_report_commsent(1);
  
  #ifdef USE_SSL
  		if (SSLok == 'S' && secure_open_server(port) == -1)
***************
*** 3839,3844 **** report_fork_failure_to_client(Port *port, int errnum)
--- 3841,3849 ----
  	{
  		rc = send(port->sock, buffer, strlen(buffer) + 1, 0);
  	} while (rc < 0 && errno == EINTR);
+ 
+ 	if (rc > 0)
+ 		pgstat_report_commsent(rc);
  }
  
  
*** a/src/backend/utils/adt/pgstatfuncs.c
--- b/src/backend/utils/adt/pgstatfuncs.c
***************
*** 86,91 **** extern Datum pg_stat_get_db_temp_files(PG_FUNCTION_ARGS);
--- 86,93 ----
  extern Datum pg_stat_get_db_temp_bytes(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_db_blk_read_time(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_db_blk_write_time(PG_FUNCTION_ARGS);
+ extern Datum pg_stat_get_db_bytes_sent(PG_FUNCTION_ARGS);
+ extern Datum pg_stat_get_db_bytes_received(PG_FUNCTION_ARGS);
  
  extern Datum pg_stat_get_bgwriter_timed_checkpoints(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_bgwriter_requested_checkpoints(PG_FUNCTION_ARGS);
***************
*** 99,104 **** extern Datum pg_stat_get_buf_written_backend(PG_FUNCTION_ARGS);
--- 101,110 ----
  extern Datum pg_stat_get_buf_fsync_backend(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_buf_alloc(PG_FUNCTION_ARGS);
  
+ extern Datum pg_stat_get_bytes_sent(PG_FUNCTION_ARGS);
+ extern Datum pg_stat_get_bytes_received(PG_FUNCTION_ARGS);
+ extern Datum pg_stat_get_socket_stat_reset_time(PG_FUNCTION_ARGS);
+ 
  extern Datum pg_stat_get_xact_numscans(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_xact_tuples_returned(PG_FUNCTION_ARGS);
  extern Datum pg_stat_get_xact_tuples_fetched(PG_FUNCTION_ARGS);
***************
*** 534,540 **** pg_stat_get_activity(PG_FUNCTION_ARGS)
  
  		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
  
! 		tupdesc = CreateTemplateTupleDesc(14, false);
  		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "datid",
  						   OIDOID, -1, 0);
  		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "pid",
--- 540,546 ----
  
  		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
  
! 		tupdesc = CreateTemplateTupleDesc(16, false);
  		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "datid",
  						   OIDOID, -1, 0);
  		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "pid",
***************
*** 563,568 **** pg_stat_get_activity(PG_FUNCTION_ARGS)
--- 569,578 ----
  						   TEXTOID, -1, 0);
  		TupleDescInitEntry(tupdesc, (AttrNumber) 14, "client_port",
  						   INT4OID, -1, 0);
+ 		TupleDescInitEntry(tupdesc, (AttrNumber) 15, "bytes_sent",
+ 						   INT8OID, -1, 0);
+ 		TupleDescInitEntry(tupdesc, (AttrNumber) 16, "bytes_received",
+ 						   INT8OID, -1, 0);
  
  		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
  
***************
*** 614,621 **** pg_stat_get_activity(PG_FUNCTION_ARGS)
  	if (funcctx->call_cntr < funcctx->max_calls)
  	{
  		/* for each row */
! 		Datum		values[14];
! 		bool		nulls[14];
  		HeapTuple	tuple;
  		PgBackendStatus *beentry;
  		SockAddr	zero_clientaddr;
--- 624,631 ----
  	if (funcctx->call_cntr < funcctx->max_calls)
  	{
  		/* for each row */
! 		Datum		values[16];
! 		bool		nulls[16];
  		HeapTuple	tuple;
  		PgBackendStatus *beentry;
  		SockAddr	zero_clientaddr;
***************
*** 773,778 **** pg_stat_get_activity(PG_FUNCTION_ARGS)
--- 783,790 ----
  					nulls[13] = true;
  				}
  			}
+ 			values[14] = Int64GetDatum(beentry->st_bytes_sent);
+ 			values[15] = Int64GetDatum(beentry->st_bytes_received);
  		}
  		else
  		{
***************
*** 787,792 **** pg_stat_get_activity(PG_FUNCTION_ARGS)
--- 799,806 ----
  			nulls[11] = true;
  			nulls[12] = true;
  			nulls[13] = true;
+ 			nulls[14] = true;
+ 			nulls[15] = true;
  		}
  
  		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
***************
*** 1407,1412 **** pg_stat_get_db_blk_write_time(PG_FUNCTION_ARGS)
--- 1421,1456 ----
  }
  
  Datum
+ pg_stat_get_db_bytes_sent(PG_FUNCTION_ARGS)
+ {
+ 	Oid			dbid = PG_GETARG_OID(0);
+ 	double		result;
+ 	PgStat_StatDBEntry *dbentry;
+ 
+ 	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+ 		result = 0;
+ 	else
+ 		result = dbentry->n_bytes_sent;
+ 
+ 	PG_RETURN_INT64(result);
+ }
+ 
+ Datum
+ pg_stat_get_db_bytes_received(PG_FUNCTION_ARGS)
+ {
+ 	Oid			dbid = PG_GETARG_OID(0);
+ 	double		result;
+ 	PgStat_StatDBEntry *dbentry;
+ 
+ 	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+ 		result = 0;
+ 	else
+ 		result = dbentry->n_bytes_received;
+ 
+ 	PG_RETURN_INT64(result);
+ }
+ 
+ Datum
  pg_stat_get_bgwriter_timed_checkpoints(PG_FUNCTION_ARGS)
  {
  	PG_RETURN_INT64(pgstat_fetch_global()->timed_checkpoints);
***************
*** 1453,1459 **** pg_stat_get_checkpoint_sync_time(PG_FUNCTION_ARGS)
  Datum
  pg_stat_get_bgwriter_stat_reset_time(PG_FUNCTION_ARGS)
  {
! 	PG_RETURN_TIMESTAMPTZ(pgstat_fetch_global()->stat_reset_timestamp);
  }
  
  Datum
--- 1497,1503 ----
  Datum
  pg_stat_get_bgwriter_stat_reset_time(PG_FUNCTION_ARGS)
  {
! 	PG_RETURN_TIMESTAMPTZ(pgstat_fetch_global()->bgwriter_stat_reset_timestamp);
  }
  
  Datum
***************
*** 1475,1480 **** pg_stat_get_buf_alloc(PG_FUNCTION_ARGS)
--- 1519,1542 ----
  }
  
  Datum
+ pg_stat_get_bytes_sent(PG_FUNCTION_ARGS)
+ {
+ 	PG_RETURN_INT64(pgstat_fetch_global()->bytes_sent);
+ }
+ 
+ Datum
+ pg_stat_get_bytes_received(PG_FUNCTION_ARGS)
+ {
+ 	PG_RETURN_INT64(pgstat_fetch_global()->bytes_received);
+ }
+ 
+ Datum
+ pg_stat_get_socket_stat_reset_time(PG_FUNCTION_ARGS)
+ {
+ 	PG_RETURN_TIMESTAMPTZ(pgstat_fetch_global()->socket_stat_reset_timestamp);
+ }
+ 
+ Datum
  pg_stat_get_xact_numscans(PG_FUNCTION_ARGS)
  {
  	Oid			relid = PG_GETARG_OID(0);
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 2626,2632 **** DATA(insert OID = 3057 ( pg_stat_get_autoanalyze_count PGNSP PGUID 12 1 0 0 0 f
  DESCR("statistics: number of auto analyzes for a table");
  DATA(insert OID = 1936 (  pg_stat_get_backend_idset		PGNSP PGUID 12 1 100 0 0 f f f f t t s 0 0 23 "" _null_ _null_ _null_ _null_ pg_stat_get_backend_idset _null_ _null_ _null_ ));
  DESCR("statistics: currently active backend IDs");
! DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,25,16,1184,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,waiting,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active backends");
  DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active replication");
--- 2626,2632 ----
  DESCR("statistics: number of auto analyzes for a table");
  DATA(insert OID = 1936 (  pg_stat_get_backend_idset		PGNSP PGUID 12 1 100 0 0 f f f f t t s 0 0 23 "" _null_ _null_ _null_ _null_ pg_stat_get_backend_idset _null_ _null_ _null_ ));
  DESCR("statistics: currently active backend IDs");
! DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,25,16,1184,1184,1184,1184,869,25,23,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,pid,usesysid,application_name,state,query,waiting,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,bytes_sent,bytes_received}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active backends");
  DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active replication");
***************
*** 2696,2701 **** DATA(insert OID = 2844 (  pg_stat_get_db_blk_read_time	PGNSP PGUID 12 1 0 0 0 f
--- 2696,2705 ----
  DESCR("statistics: block read time, in msec");
  DATA(insert OID = 2845 (  pg_stat_get_db_blk_write_time PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 701 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_blk_write_time _null_ _null_ _null_ ));
  DESCR("statistics: block write time, in msec");
+ DATA(insert OID = 3195 (  pg_stat_get_db_bytes_sent PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 20 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_bytes_sent _null_ _null_ _null_ ));
+ DESCR("statistics: number of bytes sent over communication sockets");
+ DATA(insert OID = 3196 (  pg_stat_get_db_bytes_received PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 20 "26" _null_ _null_ _null_ _null_ pg_stat_get_db_bytes_received _null_ _null_ _null_ ));
+ DESCR("statistics: number of bytes received over communication sockets");
  DATA(insert OID = 2769 ( pg_stat_get_bgwriter_timed_checkpoints PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 20 "" _null_ _null_ _null_ _null_ pg_stat_get_bgwriter_timed_checkpoints _null_ _null_ _null_ ));
  DESCR("statistics: number of timed checkpoints started by the bgwriter");
  DATA(insert OID = 2770 ( pg_stat_get_bgwriter_requested_checkpoints PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 20 "" _null_ _null_ _null_ _null_ pg_stat_get_bgwriter_requested_checkpoints _null_ _null_ _null_ ));
***************
*** 2719,2724 **** DESCR("statistics: number of backend buffer writes that did their own fsync");
--- 2723,2735 ----
  DATA(insert OID = 2859 ( pg_stat_get_buf_alloc			PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 20 "" _null_ _null_ _null_ _null_ pg_stat_get_buf_alloc _null_ _null_ _null_ ));
  DESCR("statistics: number of buffer allocations");
  
+ DATA(insert OID = 3197 ( pg_stat_get_bytes_sent PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 20 "" _null_ _null_ _null_ _null_ pg_stat_get_bytes_sent _null_ _null_ _null_ ));
+ DESCR("statistics: number of bytes sent over communication sockets");
+ DATA(insert OID = 3198 ( pg_stat_get_bytes_received PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 20 "" _null_ _null_ _null_ _null_ pg_stat_get_bytes_received _null_ _null_ _null_ ));
+ DESCR("statistics: number of bytes received over communication sockets");
+ DATA(insert OID = 3199 ( pg_stat_get_socket_stat_reset_time PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 1184 "" _null_ _null_ _null_ _null_	pg_stat_get_socket_stat_reset_time _null_ _null_ _null_ ));
+ DESCR("statistics: last reset for the communication socket statistics");
+ 
  DATA(insert OID = 2978 (  pg_stat_get_function_calls		PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 20 "26" _null_ _null_ _null_ _null_ pg_stat_get_function_calls _null_ _null_ _null_ ));
  DESCR("statistics: number of function calls");
  DATA(insert OID = 2979 (  pg_stat_get_function_total_time	PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 701 "26" _null_ _null_ _null_ _null_ pg_stat_get_function_total_time _null_ _null_ _null_ ));
*** a/src/include/pgstat.h
--- b/src/include/pgstat.h
***************
*** 49,55 **** typedef enum StatMsgType
  	PGSTAT_MTYPE_FUNCPURGE,
  	PGSTAT_MTYPE_RECOVERYCONFLICT,
  	PGSTAT_MTYPE_TEMPFILE,
! 	PGSTAT_MTYPE_DEADLOCK
  } StatMsgType;
  
  /* ----------
--- 49,57 ----
  	PGSTAT_MTYPE_FUNCPURGE,
  	PGSTAT_MTYPE_RECOVERYCONFLICT,
  	PGSTAT_MTYPE_TEMPFILE,
! 	PGSTAT_MTYPE_DEADLOCK,
! 	PGSTAT_MTYPE_COMMSENT,
! 	PGSTAT_MTYPE_COMMRECEIVED
  } StatMsgType;
  
  /* ----------
***************
*** 102,108 **** typedef struct PgStat_TableCounts
  /* Possible targets for resetting cluster-wide shared values */
  typedef enum PgStat_Shared_Reset_Target
  {
! 	RESET_BGWRITER
  } PgStat_Shared_Reset_Target;
  
  /* Possible object types for resetting single counters */
--- 104,111 ----
  /* Possible targets for resetting cluster-wide shared values */
  typedef enum PgStat_Shared_Reset_Target
  {
! 	RESET_BGWRITER,
! 	RESET_SOCKET
  } PgStat_Shared_Reset_Target;
  
  /* Possible object types for resetting single counters */
***************
*** 397,402 **** typedef struct PgStat_MsgTempFile
--- 400,419 ----
  } PgStat_MsgTempFile;
  
  /* ----------
+   * PgStat_MsgComm
+   *
+   * Sent upon sending or receiving data over a communication socket (header determines direction)
+   * ----------
+   */
+ typedef struct PgStat_MsgComm
+ {
+ 	PgStat_MsgHdr	m_hdr;
+ 
+ 	Oid			m_databaseid;
+ 	int			m_bytes_transferred;
+ } PgStat_MsgComm;
+ 
+ /* ----------
   * PgStat_FunctionCounts	The actual per-function counts kept by a backend
   *
   * This struct should contain only actual event counters, because we memcmp
***************
*** 515,521 **** typedef union PgStat_Msg
   * ------------------------------------------------------------
   */
  
! #define PGSTAT_FILE_FORMAT_ID	0x01A5BC9B
  
  /* ----------
   * PgStat_StatDBEntry			The collector's data per database
--- 532,538 ----
   * ------------------------------------------------------------
   */
  
! #define PGSTAT_FILE_FORMAT_ID	0x01A5BC9C
  
  /* ----------
   * PgStat_StatDBEntry			The collector's data per database
***************
*** 544,549 **** typedef struct PgStat_StatDBEntry
--- 561,570 ----
  	PgStat_Counter n_deadlocks;
  	PgStat_Counter n_block_read_time;	/* times in microseconds */
  	PgStat_Counter n_block_write_time;
+ 	/* communication socket transfer counter in bytes (backend to client) */
+ 	PgStat_Counter n_bytes_sent;
+ 	/* communication socket transfer counter in bytes (client to backend) */
+ 	PgStat_Counter n_bytes_received;
  
  	TimestampTz stat_reset_timestamp;
  	TimestampTz stats_timestamp;	/* time of db stats file update */
***************
*** 614,619 **** typedef struct PgStat_StatFuncEntry
--- 635,641 ----
  typedef struct PgStat_GlobalStats
  {
  	TimestampTz stats_timestamp;	/* time of stats file update */
+ 	/* bgwriter stats */
  	PgStat_Counter timed_checkpoints;
  	PgStat_Counter requested_checkpoints;
  	PgStat_Counter checkpoint_write_time;		/* times in milliseconds */
***************
*** 624,630 **** typedef struct PgStat_GlobalStats
  	PgStat_Counter buf_written_backend;
  	PgStat_Counter buf_fsync_backend;
  	PgStat_Counter buf_alloc;
! 	TimestampTz stat_reset_timestamp;
  } PgStat_GlobalStats;
  
  
--- 646,656 ----
  	PgStat_Counter buf_written_backend;
  	PgStat_Counter buf_fsync_backend;
  	PgStat_Counter buf_alloc;
! 	TimestampTz bgwriter_stat_reset_timestamp;
! 	/* communication socket stats */
! 	PgStat_Counter bytes_sent; /* in bytes (cluster to client) */
! 	PgStat_Counter bytes_received; /* in bytes (client to cluster) */
! 	TimestampTz socket_stat_reset_timestamp;
  } PgStat_GlobalStats;
  
  
***************
*** 697,702 **** typedef struct PgBackendStatus
--- 723,734 ----
  
  	/* current command string; MUST be null-terminated */
  	char	   *st_activity;
+ 
+ 	/* communication socket transfer counter in bytes (backend to client) */
+ 	unsigned long st_bytes_sent;
+ 	/* communication socket transfer counter in bytes (client to backend) */
+ 	unsigned long st_bytes_received;
+ 
  } PgBackendStatus;
  
  /*
***************
*** 788,793 **** extern void pgstat_report_tempfile(size_t filesize);
--- 820,828 ----
  extern void pgstat_report_appname(const char *appname);
  extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
  extern void pgstat_report_waiting(bool waiting);
+ extern void pgstat_report_commsent(int count);
+ extern void pgstat_report_commreceived(int count);
+ 
  extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
  extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer,
  									int buflen);
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
***************
*** 1595,1603 **** pg_stat_activity| SELECT s.datid,
      s.state_change, 
      s.waiting, 
      s.state, 
!     s.query
     FROM pg_database d, 
!     pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port), 
      pg_authid u
    WHERE ((s.datid = d.oid) AND (s.usesysid = u.oid));
  pg_stat_all_indexes| SELECT c.oid AS relid, 
--- 1595,1605 ----
      s.state_change, 
      s.waiting, 
      s.state, 
!     s.query, 
!     s.bytes_sent, 
!     s.bytes_received
     FROM pg_database d, 
!     pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, bytes_sent, bytes_received), 
      pg_authid u
    WHERE ((s.datid = d.oid) AND (s.usesysid = u.oid));
  pg_stat_all_indexes| SELECT c.oid AS relid, 
***************
*** 1669,1674 **** pg_stat_database| SELECT d.oid AS datid,
--- 1671,1678 ----
      pg_stat_get_db_deadlocks(d.oid) AS deadlocks, 
      pg_stat_get_db_blk_read_time(d.oid) AS blk_read_time, 
      pg_stat_get_db_blk_write_time(d.oid) AS blk_write_time, 
+     pg_stat_get_db_bytes_sent(d.oid) AS bytes_sent, 
+     pg_stat_get_db_bytes_received(d.oid) AS bytes_received, 
      pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset
     FROM pg_database d;
  pg_stat_database_conflicts| SELECT d.oid AS datid, 
***************
*** 1687,1692 **** pg_stat_replication| SELECT s.pid,
--- 1691,1698 ----
      s.client_hostname, 
      s.client_port, 
      s.backend_start, 
+     s.bytes_sent, 
+     s.bytes_received, 
      w.state, 
      w.sent_location, 
      w.write_location, 
***************
*** 1694,1703 **** pg_stat_replication| SELECT s.pid,
      w.replay_location, 
      w.sync_priority, 
      w.sync_state
!    FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port), 
      pg_authid u, 
      pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)
    WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid));
  pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, 
      pg_stat_all_indexes.indexrelid, 
      pg_stat_all_indexes.schemaname, 
--- 1700,1712 ----
      w.replay_location, 
      w.sync_priority, 
      w.sync_state
!    FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, bytes_sent, bytes_received), 
      pg_authid u, 
      pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)
    WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid));
+ pg_stat_socket| SELECT pg_stat_get_bytes_sent() AS bytes_sent, 
+     pg_stat_get_bytes_received() AS bytes_received, 
+     pg_stat_get_socket_stat_reset_time() AS stats_reset;
  pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, 
      pg_stat_all_indexes.indexrelid, 
      pg_stat_all_indexes.schemaname, 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to