On Fri, May 19, 2017 at 3:01 PM, Masahiko Sawada <[email protected]> wrote:
> Also, as Horiguchi-san pointed out earlier, walreceiver seems need the
> similar fix.
Actually, now that I look at it, ready_to_display should as well be
protected by the lock of the WAL receiver, so it is incorrectly placed
in walreceiver.h. As you are pointing out, pg_stat_get_wal_receiver()
is lazy as well, and that's new in 10, so we have an open item here
for both of them. And I am the author for both things. No issues
spotted in walreceiverfuncs.c after review.
I am adding an open item so as both issues are fixed in PG10. With the
WAL sender part, I think that this should be a group shot.
So what do you think about the attached?
--
Michael
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ad213fc454..5ab5e95fa9 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -409,15 +409,23 @@ void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
+ WalSnd *walsnd = MyWalSnd;
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
+ XLogRecPtr flush;
+ WalSndState state;
bool got_recptr;
bool am_sync;
int numwrite = 0;
int numflush = 0;
int numapply = 0;
+ SpinLockAcquire(&walsnd->mutex);
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/*
* If this WALSender is serving a standby that is not on the list of
* potential sync standbys then we have nothing to do. If we are still
@@ -425,8 +433,8 @@ SyncRepReleaseWaiters(void)
* still invalid, then leave quickly also.
*/
if (MyWalSnd->sync_standby_priority == 0 ||
- MyWalSnd->state < WALSNDSTATE_STREAMING ||
- XLogRecPtrIsInvalid(MyWalSnd->flush))
+ state < WALSNDSTATE_STREAMING ||
+ XLogRecPtrIsInvalid(flush))
{
announce_next_takeover = true;
return;
@@ -711,14 +719,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr flush;
+ WalSndState state;
+ int pid;
+
walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/* Must be active */
- if (walsnd->pid == 0)
+ if (pid == 0)
continue;
/* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
+ if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
@@ -726,7 +744,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
+ if (XLogRecPtrIsInvalid(flush))
continue;
/*
@@ -780,14 +798,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
*/
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr flush;
+ WalSndState state;
+ int pid;
+
walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/* Must be active */
- if (walsnd->pid == 0)
+ if (pid == 0)
continue;
/* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
+ if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
@@ -796,7 +824,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
+ if (XLogRecPtrIsInvalid(flush))
continue;
/*
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2723612718..25f12e0706 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1392,23 +1392,13 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
TimestampTz latest_end_time;
char *slotname;
char *conninfo;
-
- /*
- * No WAL receiver (or not ready yet), just return a tuple with NULL
- * values
- */
- if (walrcv->pid == 0 || !walrcv->ready_to_display)
- PG_RETURN_NULL();
-
- /* determine result type */
- if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
- elog(ERROR, "return type must be a row type");
-
- values = palloc0(sizeof(Datum) * tupdesc->natts);
- nulls = palloc0(sizeof(bool) * tupdesc->natts);
+ int pid;
+ bool ready_to_display;
/* Take a lock to ensure value consistency */
SpinLockAcquire(&walrcv->mutex);
+ pid = walrcv->pid;
+ ready_to_display = walrcv->ready_to_display;
state = walrcv->walRcvState;
receive_start_lsn = walrcv->receiveStart;
receive_start_tli = walrcv->receiveStartTLI;
@@ -1422,8 +1412,22 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
conninfo = pstrdup(walrcv->conninfo);
SpinLockRelease(&walrcv->mutex);
+ /*
+ * No WAL receiver (or not ready yet), just return a tuple with NULL
+ * values
+ */
+ if (pid == 0 || !ready_to_display)
+ PG_RETURN_NULL();
+
+ /* determine result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ values = palloc0(sizeof(Datum) * tupdesc->natts);
+ nulls = palloc0(sizeof(bool) * tupdesc->natts);
+
/* Fetch values */
- values[0] = Int32GetDatum(walrcv->pid);
+ values[0] = Int32GetDatum(pid);
if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 49cce38880..a04be8039d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2874,10 +2874,12 @@ WalSndRqstFileReload(void)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
continue;
-
- SpinLockAcquire(&walsnd->mutex);
+ }
walsnd->needreload = true;
SpinLockRelease(&walsnd->mutex);
}
@@ -3190,14 +3192,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
TimeOffset flushLag;
TimeOffset applyLag;
int priority;
+ int pid;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+ SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
continue;
-
- SpinLockAcquire(&walsnd->mutex);
+ }
+ pid = walsnd->pid;
sentPtr = walsnd->sentPtr;
state = walsnd->state;
write = walsnd->write;
@@ -3210,7 +3216,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
- values[0] = Int32GetDatum(walsnd->pid);
+ values[0] = Int32GetDatum(pid);
if (!superuser())
{
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 31d090c99d..44199670b1 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -114,6 +114,9 @@ typedef struct
*/
char slotname[NAMEDATALEN];
+ /* set true once conninfo is ready to display (obfuscated pwds etc) */
+ bool ready_to_display;
+
slock_t mutex; /* locks shared variables shown above */
/*
@@ -122,9 +125,6 @@ typedef struct
*/
bool force_reply;
- /* set true once conninfo is ready to display (obfuscated pwds etc) */
- bool ready_to_display;
-
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers