On 2021-03-19 16:30, Fujii Masao wrote:
> On 2021/03/15 10:39, Masahiro Ikeda wrote:
>> Thanks, I understood get_sync_bit() checks the sync flags and
>> the write unit of generated wal data and replicated wal data is 
>> different.
>> (It's interesting optimization whether to use kernel cache or not.)
>> 
>> OK. Although I agree to separate the stats for the walrecever,
>> I want to hear opinions from other people too. I didn't change the 
>> patch.
>> 
>> Please feel to your comments.
> 
> What about applying the patch for common WAL write function like
> XLogWriteFile(), separately from the patch for walreceiver's stats?
> Seems the former reaches the consensus, so we can commit it firstly.
> Also even only the former change is useful because which allows
> walreceiver to report WALWrite wait event.

Agreed. I separated the patches.

If only the former is committed, my trivial concern is that there may be
a disadvantage, but no advantage for the standby server. It may lead to
performance degradation to the wal receiver by calling
INSTR_TIME_SET_CURRENT(), but the stats can't visible for users until the
latter patch is committed.

I think it's ok because this not happening in the case to disable the
"track_wal_io_timing" in the standby server. Although some users may start the
standby server using the backup which "track_wal_io_timing" is enabled in the
primary server, they will say it's ok since the users already accept the
performance degradation in the primary server.

>> OK. I agree.
>> 
>> I wonder to change the error check ways depending on who calls this 
>> function?
>> Now, only the walreceiver checks (1)errno==0 and doesn't check 
>> (2)errno==ENITR.
>> Other processes are the opposite.
>> 
>> IIUC, it's appropriate that every process checks (1)(2).
>> Please let me know my understanding is wrong.
> 
> I'm thinking the same. Regarding (2), commit 79ce29c734 introduced
> that code. According to the following commit log, it seems harmless
> to retry on EINTR even walreceiver.
> 
>     Also retry on EINTR. All signals used in the backend are flagged 
> SA_RESTART
>     nowadays, so it shouldn't happen, but better to be defensive.

Thanks, I understood.


>>> BTW, currently XLogWrite() increments IO timing even when pg_pwrite()
>>> reports an error. But this is useless. Probably IO timing should be
>>> incremented after the return code of pg_pwrite() is checked, instead?
>> 
>> Yes, I agree. I fixed it.
>> (v18-0003-Makes-the-wal-receiver-report-WAL-statistics.patch)
> 
> Thanks for the patch!
> 
>                       nleft = nbytes;
>                       do
>                       {
> -                             errno = 0;
> +                             written = XLogWriteFile(openLogFile, from, 
> nleft, (off_t) 
> startoffset,
> +                                                                             
> ThisTimeLineID, openLogSegNo, wal_segment_size);
> 
> Can we merge this do-while loop in XLogWrite() into the loop
> in XLogWriteFile()?
> If we do that, ISTM that the following codes are not necessary in 
> XLogWrite().
> 
>                               nleft -= written;
>                               from += written;

OK, I fixed it.


> + * 'segsize' is a segment size of WAL segment file.
> 
> Since segsize is always wal_segment_size, segsize argument seems
> not necessary in XLogWriteFile().

Right. I fixed it.


> +XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset,
> +                       TimeLineID timelineid, XLogSegNo segno, int segsize)
> 
> Why did you use "const void *" instead of "char *" for *buf?

I followed the argument of pg_pwrite().
But, I think "char *" is better, so fixed it.


> Regarding 0005 patch, I will review it later.

Thanks.


Regards,
-- 
Masahiro Ikeda
NTT DATA CORPORATION



diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a7a94d2a83..df028c5039 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -771,6 +771,9 @@ WalRcvDie(int code, Datum arg)
 	/* Ensure that all WAL records received are flushed to disk */
 	XLogWalRcvFlush(true);
 
+	/* Send WAL statistics to the stats collector before terminating */
+	pgstat_send_wal(true);
+
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -910,6 +913,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 					XLogArchiveForceDone(xlogfname);
 				else
 					XLogArchiveNotify(xlogfname);
+
+				/*
+				 * Send WAL statistics to the stats collector when finishing
+				 * the current WAL segment file to avoid overloading it.
+				 */
+				pgstat_send_wal(false);
 			}
 			recvFile = -1;
 



diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bd5e787e55..4c7d90f1b9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2536,61 +2536,14 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			Size		nbytes;
 			Size		nleft;
 			int			written;
-			instr_time	start;
 
 			/* OK to write the page(s) */
 			from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
 			nbytes = npages * (Size) XLOG_BLCKSZ;
 			nleft = nbytes;
-			do
-			{
-				errno = 0;
-
-				/* Measure I/O timing to write WAL data */
-				if (track_wal_io_timing)
-					INSTR_TIME_SET_CURRENT(start);
-
-				pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
-				written = pg_pwrite(openLogFile, from, nleft, startoffset);
-				pgstat_report_wait_end();
-
-				/*
-				 * Increment the I/O timing and the number of times WAL data
-				 * were written out to disk.
-				 */
-				if (track_wal_io_timing)
-				{
-					instr_time	duration;
-
-					INSTR_TIME_SET_CURRENT(duration);
-					INSTR_TIME_SUBTRACT(duration, start);
-					WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
-				}
-
-				WalStats.m_wal_write++;
-
-				if (written <= 0)
-				{
-					char		xlogfname[MAXFNAMELEN];
-					int			save_errno;
-
-					if (errno == EINTR)
-						continue;
-
-					save_errno = errno;
-					XLogFileName(xlogfname, ThisTimeLineID, openLogSegNo,
-								 wal_segment_size);
-					errno = save_errno;
-					ereport(PANIC,
-							(errcode_for_file_access(),
-							 errmsg("could not write to log file %s "
-									"at offset %u, length %zu: %m",
-									xlogfname, startoffset, nleft)));
-				}
-				nleft -= written;
-				from += written;
-				startoffset += written;
-			} while (nleft > 0);
+			written = XLogWriteFile(openLogFile, from, nleft, (off_t) startoffset,
+									ThisTimeLineID, openLogSegNo, true);
+			startoffset += written;
 
 			npages = 0;
 
@@ -2707,6 +2660,94 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	}
 }
 
+/*
+ * Issue pg_pwrite to write an WAL segment file.
+ *
+ * 'fd' is a file descriptor for the XLOG file to write
+ * 'buf' is a buffer starting address to write.
+ * 'nbyte' is a number of max bytes to write up.
+ * 'offset' is a offset of XLOG file to be set.
+ * 'timelineid' is a timeline ID of WAL segment file.
+ * 'segno' is a segment number of WAL segment file.
+ * 'write_all' is whether to write 'nbyte' exactly.
+ *
+ * Return the number of bytes written.
+ */
+int
+XLogWriteFile(int fd, char *buf, size_t nbyte, off_t offset,
+			  TimeLineID timelineid, XLogSegNo segno, bool write_all)
+{
+	int			written = 0;
+
+	/*
+	 * Loop until to write the buffer data or an error occurred.
+	 */
+	for (;;)
+	{
+		int			written_tmp;
+		instr_time	start;
+
+		errno = 0;
+
+		/* Measure I/O timing to write WAL data */
+		if (track_wal_io_timing)
+			INSTR_TIME_SET_CURRENT(start);
+
+		pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
+		written_tmp = pg_pwrite(fd, buf, nbyte, offset);
+		pgstat_report_wait_end();
+
+		if (written_tmp <= 0)
+		{
+			char		xlogfname[MAXFNAMELEN];
+			int			save_errno;
+
+			/*
+			 * Retry on EINTR. All signals used in the backend and background
+			 * processes are flagged SA_RESTART, so it shouldn't happen, but
+			 * better to be defensive.
+			 */
+			if (errno == EINTR)
+				continue;
+
+			/* if write didn't set errno, assume no disk space */
+			if (errno == 0)
+				errno = ENOSPC;
+
+			save_errno = errno;
+			XLogFileName(xlogfname, timelineid, segno, wal_segment_size);
+			errno = save_errno;
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not write to log file %s "
+							"at offset %u, length %zu: %m",
+							xlogfname, (unsigned int) offset, nbyte)));
+		}
+
+		/*
+		 * Increment the I/O timing and the number of times WAL data were
+		 * written out to disk.
+		 */
+		if (track_wal_io_timing)
+		{
+			instr_time	duration;
+
+			INSTR_TIME_SET_CURRENT(duration);
+			INSTR_TIME_SUBTRACT(duration, start);
+			WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
+		}
+
+		WalStats.m_wal_write++;
+
+		nbyte -= written_tmp;
+		buf += written_tmp;
+		written += written_tmp;
+
+		if (!write_all || nbyte <= 0)
+			return written;
+	}
+}
+
 /*
  * Record the LSN for an asynchronous transaction commit/abort
  * and nudge the WALWriter if there is work for it to do.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a7a94d2a83..daf764446f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -868,7 +868,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 static void
 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 {
-	int			startoff;
+	uint32		startoff;
 	int			byteswritten;
 
 	while (nbytes > 0)
@@ -929,27 +929,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 			segbytes = nbytes;
 
 		/* OK to write the logs */
-		errno = 0;
-
-		byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
-		if (byteswritten <= 0)
-		{
-			char		xlogfname[MAXFNAMELEN];
-			int			save_errno;
-
-			/* if write didn't set errno, assume no disk space */
-			if (errno == 0)
-				errno = ENOSPC;
-
-			save_errno = errno;
-			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
-			errno = save_errno;
-			ereport(PANIC,
-					(errcode_for_file_access(),
-					 errmsg("could not write to log segment %s "
-							"at offset %u, length %lu: %m",
-							xlogfname, startoff, (unsigned long) segbytes)));
-		}
+		byteswritten = XLogWriteFile(recvFile, buf, segbytes, (off_t) startoff,
+									 recvFileTLI, recvSegNo, false);
 
 		/* Update state for write */
 		recptr += byteswritten;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 77187c12be..b562cfa4c1 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -298,6 +298,10 @@ extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
 extern int	XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
 extern int	XLogFileOpen(XLogSegNo segno);
+extern int	XLogWriteFile(int fd, char *buf,
+						  size_t nbyte, off_t offset,
+						  TimeLineID timelineid, XLogSegNo segno,
+						  bool write_all);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern XLogSegNo XLogGetLastRemovedSegno(void);



Reply via email to