Hi,

Since the last patch version I've done a number of experiments with this
throttling idea, so let me share some of the ideas and results, and see
where that gets us.

The patch versions so far tied everything to syncrep - commit latency
with sync replica was the original motivation, so this makes sense. But
while thinking about this and discussing this with a couple people, I've
been wondering why to limit this to just that particular option. There's
a couple other places in the WAL write path where we might do a similar
thing (i.e. wait) or be a bit more aggressive (and do a write/flush),
depending on circumstances.

If I simplify this a bit, there are about 3 WAL positions that I could
think of:

- write LSN (how far we wrote WAL to disk)
- flush LSN (how far we flushed WAL to local disk)
- syncrep LSN (how far the sync replica confirmed WAL)

So, why couldn't there be a similar "throttling threshold" for these
events too? Imagine we have three GUCs, with values satisfying this:

  wal_write_after < wal_flush_after_local < wal_flush_after_remote

and this meaning:

  wal_write_after - if a backend generates this amount of WAL, it will
                    write the completed WAL (but only whole pages)

  wal_flush_after_local - if a backend generates this amount of WAL, it
                          will not only write the WAL, but also issue a
                          flush (if still needed)

  wal_flush_after_remote - if this amount of WAL is generated, it will
                           wait for syncrep to confirm the flushed LSN

The attached PoC patch does this, mostly the same way as earlier
patches. XLogInsertRecord is where the decision whether throttling may
be needed is done, HandleXLogDelayPending then does the actual work
(writing WAL, flushing it, waiting for syncrep).

The one new thing HandleXLogDelayPending also does is auto-tuning the
values a bit. The idea is that with per-backend threshold, it's hard to
enforce some sort of global limit, because if depends on the number of
active backends. If you set 1MB of WAL per backend, the total might be
1MB or 1000MB, if there are 1000 backends. Who knows. So this tries to
reduce the threshold (if the backend generated only a tiny fraction of
the WAL), or increase the threshold (if it generated most of it). I'm
not entirely sure this behaves sanely under all circumstances, but for a
PoC patch it seems OK.

The first two GUCs remind me what walwriter is doing, and I've been
asking myself if maybe making it more aggressive would have the same
effect. But I don't think so, because a big part of this throttling
patch is ... well, throttling. Making the backends sleep for a bit (or
wait for something), to slow it down. And walwriter doesn't really do
that I think.


In a recent off-list discussion, someone asked if maybe this might be
useful to prevent emergencies due to archiver not keeping up and WAL
filling disk. A bit like enforcing a more "strict" limit on WAL than the
current max_wal_size GUC. I'm not sure about that, it's certainly a very
different use case than minimizing impact on OLTP latency. But it seems
like "archived LSN" might be another "event" the backends would wait
for, just like they wait for syncrep to confirm a LSN. Ideally it'd
never happen, ofc, and it seems a bit like a great footgun (outage on
archiver may kill PROD), but if you're at risk of ENOSPACE on pg_wal,
not doing anything may be risky too ...

FWIW I wonder if maybe we should frame this a as a QoS feature, where
instead of "minimize impact of bulk loads" we'd try to "guarantee" or
"reserve" some part of the capacity to certain backends/...


Now, let's look at results from some of the experiments. I wanted to see
how effective this approach could be in minimizing impact of large bulk
loads at small OLTP transactions in different setups. Thanks to the two
new GUCs this is not strictly about syncrep, so I decided to try three
cases:

1) local, i.e. single-node instance

2) syncrep on the same switch, with 0.1ms latency (1Gbit)

2) syncrep with 10ms latency (also 1Gbit)

And for each configuration I did ran a pgbench (30 minutes), either on
it's own, or concurrently with bulk COPY of 1GB data. The load was done
either by a single backend (so one backend loading 1GB of data), or the
file was split into 10 files 100MB each, and this was loaded by 10
concurrent backends.

And I did this test with three configurations:

(a) master - unpatched, current behavior

(b) throttle-1: patched with limits set like this:

   # Add settings for extensions here
   wal_write_after = '8kB'
   wal_flush_after_local = '16kB'
   wal_flush_after_remote = '32kB'

(c) throttle-2: patched with throttling limits set to 4x of (b), i.e.

   # Add settings for extensions here
   wal_write_after = '32kB'
   wal_flush_after_local = '64kB'
   wal_flush_after_remote = '128kB'

And I did this for the traditional three scales (small, medium, large),
to hit different bottlenecks. And of course, I measured both throughput
and latencies.

The full results are available here:

[1] https://github.com/tvondra/wal-throttle-results/tree/master

I'm not going to attach the files visualizing the results here, because
it's like 1MB per file, which is not great for e-mail.


https://github.com/tvondra/wal-throttle-results/blob/master/wal-throttling.pdf
----------------------------------------------------------------------

The first file summarizes the throughput results for the three
configurations, different scales etc. On the left is throughput, on the
right is the number of load cycles completed.

I think this behaves mostly as expected - with the bulk loads, the
throughput drops. How much depends on the configuration (for syncrep
it's far more pronounced). The throttling recovers a lot of it, at the
expense of doing fewer loads - and it's quite significant drop. But
that's expected, and it was kinda what this patch was about - prioritise
the small OLTP transactions by doing fewer loads. This is not a patch
that would magically inflate capacity of the system to do more things.

I however agree this does not really represent a typical production OLTP
system. Those systems don't run at 100% saturation, except for short
periods, certainly not if they're doing something latency sensitive. So
a somewhat realistic test would be pgbench throttled at 75% capacity,
leaving some spare capacity for the bulk loads.

I actually tried that, and there are results in [1], but the behavior is
pretty similar to what I'm describing here (except that the system does
actually manages to do more bulk loads, ofc).


https://raw.githubusercontent.com/tvondra/wal-throttle-results/master/syncrep/latencies-1000-full.eps
-----------------------------------------------------------------------
Now let's look at the second file, which shows latency percentiles for
the medium dataset on syncrep. The difference between master (on the
left) and the two throttling builds is pretty obvious. It's not exactly
the same as "no concurrent bulk loads" in the top row, but not far from it.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 1ff530aefb53fbffbfe79a36e270d3d167e3e70f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Sat, 4 Nov 2023 15:05:59 +0100
Subject: [PATCH v5 1/2] v4

---
 src/backend/access/heap/vacuumlazy.c          |   7 +-
 src/backend/access/transam/xlog.c             | 108 ++++++++++++++++++
 src/backend/catalog/system_views.sql          |   1 +
 src/backend/commands/explain.c                |   5 +
 src/backend/executor/instrument.c             |   2 +
 src/backend/replication/syncrep.c             |  21 +++-
 src/backend/tcop/postgres.c                   |   3 +
 src/backend/utils/activity/pgstat_wal.c       |   1 +
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/adt/pgstatfuncs.c           |  10 +-
 src/backend/utils/init/globals.c              |   1 +
 src/backend/utils/misc/guc_tables.c           |  12 ++
 src/include/access/xlog.h                     |   3 +
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/executor/instrument.h             |   1 +
 src/include/miscadmin.h                       |   2 +
 src/include/pgstat.h                          |   3 +-
 src/test/regress/expected/rules.out           |   3 +-
 18 files changed, 178 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 3b9299b8924..58f16dd3093 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -762,10 +762,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 (long long) PageMissOp,
 							 (long long) PageDirtyOp);
 			appendStringInfo(&buf,
-							 _("WAL usage: %lld records, %lld full page images, %llu bytes\n"),
+							 _("WAL usage: %lld records, %lld full page images, %llu bytes"),
 							 (long long) walusage.wal_records,
 							 (long long) walusage.wal_fpi,
 							 (unsigned long long) walusage.wal_bytes);
+			if(walusage.wal_throttled > 0)
+				appendStringInfo(&buf, _("%lld times throttled\n"), (long long) walusage.wal_throttled);
+			else
+				appendStringInfo(&buf, _("\n"));
+
 			appendStringInfo(&buf, _("system usage: %s"), pg_rusage_show(&ru0));
 
 			ereport(verbose ? INFO : LOG,
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6526bd4f432..5dab46e17e1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -73,6 +73,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "portability/instr_time.h"
 #include "port/atomics.h"
 #include "port/pg_iovec.h"
 #include "postmaster/bgwriter.h"
@@ -82,6 +83,7 @@
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -138,6 +140,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_throttle_threshold = 0;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -252,10 +255,16 @@ static int	LocalXLogInsertAllowed = -1;
  * parallel backends may have written WAL records at later LSNs than the value
  * stored here.  The parallel leader advances its own copy, when necessary,
  * in WaitForParallelWorkersToFinish.
+ *
+ * XactLastThrottledRecEnd points to the last XLOG record that should be throttled
+ * as the additional WAL records could be generated before processing interrupts.
+ *
+ * XXX I'm not sure I understand what "record to be throttled" means?
  */
 XLogRecPtr	ProcLastRecPtr = InvalidXLogRecPtr;
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
 XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
+static XLogRecPtr	XactLastThrottledRecEnd = InvalidXLogRecPtr;
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -648,6 +657,14 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+/*
+ * Amount of WAL inserted in this backend since eithe the transaction
+ * start or throttle point (where we reset the counter to 0).
+ *
+ * XXX Not sure it should refer to "backend", it's really about xact, no?
+ */
+uint32	backendWalInserted = 0;
+
 static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
 										XLogRecPtr EndOfLog,
 										TimeLineID newTLI);
@@ -1073,6 +1090,40 @@ XLogInsertRecord(XLogRecData *rdata,
 		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
 		pgWalUsage.wal_records++;
 		pgWalUsage.wal_fpi += num_fpi;
+
+		/*
+		 * Decide if we need to throttle this backend, so that it does not write
+		 * WAL too fast, causing lag against the sync standby (which in turn
+		 * increases latency for standby confirmations). We may be holding locks
+		 * and blocking interrupts here, so we only make the decision, but the
+		 * wait (for sync standby confirmation) happens elsewhere.
+		 *
+		 * The throttling is applied only to large transactions (producing more
+		 * than wal_throttle_threshold kilobytes of WAL). Throttled backends
+		 * can be identified by a new wait event SYNC_REP_THROTTLED.
+		 *
+		 * Small transactions (by amount of produced WAL) are still subject to
+		 * the sync replication, so the same wait happens at commit time.
+		 *
+		 * XXX Not sure this is the right place for a comment explaining how the
+		 * throttling works. This place is way too low level, and rather far from
+		 * the place where the wait actually happens.
+		 *
+		 * XXX Should this be done even if XLogDelayPending is already set? Maybe
+		 * that should only update XactLastThrottledRecEnd, withoug incrementing
+		 * the pgWalUsage.wal_throttled counter?
+		 */
+		backendWalInserted += rechdr->xl_tot_len;
+
+		if ((synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+			(wal_throttle_threshold > 0) &&
+			(backendWalInserted >= wal_throttle_threshold * 1024L))
+		{
+			XactLastThrottledRecEnd = XactLastRecEnd;
+			InterruptPending = true;
+			XLogDelayPending = true;
+			pgWalUsage.wal_throttled++;
+		}
 	}
 
 	return EndPos;
@@ -2635,6 +2686,24 @@ XLogFlush(XLogRecPtr record)
 		return;
 	}
 
+	/*
+	 * Reset WAL throttling bytes counter.
+	 *
+	 * XXX I think this is somewhat wrong. The LSN we want to flush may be
+	 * somewhere in the past, before some (most) of the generated WAL. For
+	 * example assume the backend just wrote 1MB, but then we ask for flush
+	 * with a position 1MB back. Most of the generated WAL is likely after
+	 * the flushed LSN, yet we're setting this to 0. (I don't know how common
+	 * such situation is, probably not very.)
+	 *
+	 * I don't think we can / should track amount of WAL for arbitrary LSN
+	 * values, but maybe we should track the last flush LSN position, and
+	 * then linearly approximate the WAL.
+	 *
+	 * XXX For the approximation we should probably use LogwrtResult.Flush.
+	 */
+	backendWalInserted = 0;
+
 	/* Quick exit if already known flushed */
 	if (record <= LogwrtResult.Flush)
 		return;
@@ -9142,3 +9211,42 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+/*
+ * HandleXLogDelayPending
+ *		Throttle backends generating large amounts of WAL.
+ *
+ * The throttling is implemented by waiting for a sync replica confirmation for
+ * a convenient LSN position. In particular, we do not wait for the current LSN,
+ * which may be in a partially filled WAL page (and we don't want to write this
+ * one out - we'd have to write it out again, causing write amplification).
+ * Instead, we move back to the last fully WAL page.
+ *
+ * Called from ProcessMessageInterrupts() to avoid syncrep waits in XLogInsert(),
+ * which happens in critical section and with blocked interrupts (so it would be
+ * impossible to cancel the wait if it gets stuck). Also, there may be locks held
+ * and we don't want to hold them longer just because of the wait.
+ *
+ * XXX Andres suggested we actually go back a couple pages, to increase the
+ * probability the LSN was already flushed (obviously, this depends on how much
+ * lag we allow).
+ *
+ * XXX Not sure why we use XactLastThrottledRecEnd and not simply XLogRecEnd?
+ */
+void
+HandleXLogDelayPending()
+{
+	XLogRecPtr 	lsn;
+
+	/* calculate last fully filled page */
+	lsn = XactLastThrottledRecEnd - (XactLastThrottledRecEnd % XLOG_BLCKSZ);
+
+	Assert(wal_throttle_threshold > 0);
+	Assert(backendWalInserted >= wal_throttle_threshold * 1024L);
+	Assert(XactLastThrottledRecEnd != InvalidXLogRecPtr);
+
+	XLogFlush(lsn);
+	SyncRepWaitForLSN(lsn, false);
+
+	XLogDelayPending = false;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 11d18ed9dd6..5721abfe5b4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1178,6 +1178,7 @@ CREATE VIEW pg_stat_wal AS
         w.wal_sync,
         w.wal_write_time,
         w.wal_sync_time,
+        w.wal_throttled,
         w.stats_reset
     FROM pg_stat_get_wal() w;
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index f1d71bc54e8..0ec58c13977 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3752,6 +3752,9 @@ show_wal_usage(ExplainState *es, const WalUsage *usage)
 			if (usage->wal_bytes > 0)
 				appendStringInfo(es->str, " bytes=" UINT64_FORMAT,
 								 usage->wal_bytes);
+			if (usage->wal_throttled > 0)
+				appendStringInfo(es->str, " throttled=%lld",
+								 (long long) usage->wal_throttled);
 			appendStringInfoChar(es->str, '\n');
 		}
 	}
@@ -3763,6 +3766,8 @@ show_wal_usage(ExplainState *es, const WalUsage *usage)
 							   usage->wal_fpi, es);
 		ExplainPropertyUInteger("WAL Bytes", NULL,
 								usage->wal_bytes, es);
+		ExplainPropertyUInteger("WAL Throttled", NULL,
+								usage->wal_throttled, es);
 	}
 }
 
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index c383f34c066..b834247e958 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -280,6 +280,7 @@ WalUsageAdd(WalUsage *dst, WalUsage *add)
 	dst->wal_bytes += add->wal_bytes;
 	dst->wal_records += add->wal_records;
 	dst->wal_fpi += add->wal_fpi;
+	dst->wal_throttled += add->wal_throttled;
 }
 
 void
@@ -288,4 +289,5 @@ WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
 	dst->wal_bytes += add->wal_bytes - sub->wal_bytes;
 	dst->wal_records += add->wal_records - sub->wal_records;
 	dst->wal_fpi += add->wal_fpi - sub->wal_fpi;
+	dst->wal_throttled += add->wal_throttled - sub->wal_throttled;
 }
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 0ea71b5c434..152c51fe299 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -144,6 +144,10 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
  * represents a commit record.  If it doesn't, then we wait only for the WAL
  * to be flushed if synchronous_commit is set to the higher level of
  * remote_apply, because only commit records provide apply feedback.
+ *
+ * This may be called either when waiting for PREPARE/COMMIT, of because of WAL
+ * throttling (in which case the flag XLogDelayPending is set to true). We use
+ * different wait events for these cases.
  */
 void
 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
@@ -153,9 +157,10 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	/*
 	 * This should be called while holding interrupts during a transaction
 	 * commit to prevent the follow-up shared memory queue cleanups to be
-	 * influenced by external interruptions.
+	 * influenced by external interruptions. The only exception is WAL throttling
+	 * where this could be called without holding interrupts.
 	 */
-	Assert(InterruptHoldoffCount > 0);
+	Assert(XLogDelayPending == true || InterruptHoldoffCount > 0);
 
 	/*
 	 * Fast exit if user has not requested sync replication, or there are no
@@ -229,6 +234,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	for (;;)
 	{
 		int			rc;
+		uint32		wait_event;
 
 		/* Must reset the latch before testing state. */
 		ResetLatch(MyLatch);
@@ -282,12 +288,21 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 			break;
 		}
 
+		/*
+		 * XLogDelayPending means this syncrep wait happens because of WAL
+		 * throttling. The flag is reset in HandleXLogDelayPending() later.
+		 */
+		if(XLogDelayPending)
+			wait_event = WAIT_EVENT_SYNC_REP_THROTTLED;
+		else
+			wait_event = WAIT_EVENT_SYNC_REP;
+
 		/*
 		 * Wait on latch.  Any condition that should wake us up will set the
 		 * latch, so no need for timeout.
 		 */
 		rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
-					   WAIT_EVENT_SYNC_REP);
+					   wait_event);
 
 		/*
 		 * If the postmaster dies, we'll probably never get an acknowledgment,
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7298a187d18..47b64d3a881 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3459,6 +3459,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		HandleParallelApplyMessages();
+
+	if (XLogDelayPending)
+		HandleXLogDelayPending();
 }
 
 /*
diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index 6a81b781357..537876d446d 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -114,6 +114,7 @@ pgstat_flush_wal(bool nowait)
 	WALSTAT_ACC(wal_records, wal_usage_diff);
 	WALSTAT_ACC(wal_fpi, wal_usage_diff);
 	WALSTAT_ACC(wal_bytes, wal_usage_diff);
+	WALSTAT_ACC(wal_throttled, wal_usage_diff);
 	WALSTAT_ACC(wal_buffers_full, PendingWalStats);
 	WALSTAT_ACC(wal_write, PendingWalStats);
 	WALSTAT_ACC(wal_sync, PendingWalStats);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index d7995931bd4..229297ba1cf 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -140,6 +140,7 @@ REPLICATION_SLOT_DROP	"Waiting for a replication slot to become inactive so it c
 RESTORE_COMMAND	"Waiting for <xref linkend="guc-restore-command"/> to complete."
 SAFE_SNAPSHOT	"Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
 SYNC_REP	"Waiting for confirmation from a remote server during synchronous replication."
+SYNC_REP_THROTTLED	"Waiting for sync replica lag to reduce."
 WAL_RECEIVER_EXIT	"Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START	"Waiting for startup process to send initial data for streaming replication."
 XACT_GROUP_UPDATE	"Waiting for the group leader to update transaction status at end of a parallel operation."
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 0cea320c00e..2ffe284cc91 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1452,7 +1452,7 @@ pg_stat_get_io(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_wal(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_COLS	9
+#define PG_STAT_GET_WAL_COLS	10
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_WAL_COLS] = {0};
 	bool		nulls[PG_STAT_GET_WAL_COLS] = {0};
@@ -1477,7 +1477,9 @@ pg_stat_get_wal(PG_FUNCTION_ARGS)
 					   FLOAT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "wal_sync_time",
 					   FLOAT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "wal_throttled",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 
 	BlessTupleDesc(tupdesc);
@@ -1504,7 +1506,9 @@ pg_stat_get_wal(PG_FUNCTION_ARGS)
 	values[6] = Float8GetDatum(((double) wal_stats->wal_write_time) / 1000.0);
 	values[7] = Float8GetDatum(((double) wal_stats->wal_sync_time) / 1000.0);
 
-	values[8] = TimestampTzGetDatum(wal_stats->stat_reset_timestamp);
+	values[8] = Int64GetDatum(wal_stats->wal_throttled);
+
+	values[9] = TimestampTzGetDatum(wal_stats->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 60bc1217fb4..ccbbfae56ac 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -40,6 +40,7 @@ volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
+bool XLogDelayPending = false;
 
 int			MyProcPid;
 pg_time_t	MyStartTime;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 6474e35ec04..b14610e0b6e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2892,6 +2892,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_throttle_after", PGC_USERSET, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates "
+						 " before waiting for sync standby, to limit the replication lag."),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&wal_throttle_threshold,
+		0, 0, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"extra_float_digits", PGC_USERSET, CLIENT_CONN_LOCALE,
 			gettext_noop("Sets the number of digits displayed for floating-point values."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index a14126d164f..9d1175edab2 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -54,6 +54,8 @@ extern PGDLLIMPORT char *wal_consistency_checking_string;
 extern PGDLLIMPORT bool log_checkpoints;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_throttle_threshold;
+extern PGDLLIMPORT uint32 backendWalInserted;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
@@ -250,6 +252,7 @@ extern TimeLineID GetWALInsertionTimeLine(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
+extern void HandleXLogDelayPending(void);
 
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb58dee3bcd..362d0e327e8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5770,9 +5770,9 @@
 { oid => '1136', descr => 'statistics: information about WAL activity',
   proname => 'pg_stat_get_wal', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int8,int8,numeric,int8,int8,int8,float8,float8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o}',
-  proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}',
+  proallargtypes => '{int8,int8,numeric,int8,int8,int8,float8,float8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,wal_throttled,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 { oid => '6248', descr => 'statistics: information about WAL prefetching',
   proname => 'pg_stat_get_recovery_prefetch', prorows => '1', proretset => 't',
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index d5d69941c52..58eca5b7a1d 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -53,6 +53,7 @@ typedef struct WalUsage
 	int64		wal_records;	/* # of WAL records produced */
 	int64		wal_fpi;		/* # of WAL full page images produced */
 	uint64		wal_bytes;		/* size of WAL records produced */
+	int64		wal_throttled;		/* # of times WAL throttling was engaged */
 } WalUsage;
 
 /* Flag bits included in InstrAlloc's instrument_options bitmask */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index f0cc651435c..44641e0e4cb 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -95,6 +95,8 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
 extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
+/* doesn't need to be volatile sig_atomic_t as it's not set by signal handler */
+extern PGDLLIMPORT bool XLogDelayPending;
 
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
 extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f95d8db0c4c..92618567868 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -235,7 +235,7 @@ typedef struct PgStat_TableXactStatus
  * ------------------------------------------------------------
  */
 
-#define PGSTAT_FILE_FORMAT_ID	0x01A5BCAC
+#define PGSTAT_FILE_FORMAT_ID	0x0907AFBD
 
 typedef struct PgStat_ArchiverStats
 {
@@ -435,6 +435,7 @@ typedef struct PgStat_WalStats
 	PgStat_Counter wal_sync;
 	PgStat_Counter wal_write_time;
 	PgStat_Counter wal_sync_time;
+	PgStat_Counter wal_throttled; /* how many times backend was throttled */
 	TimestampTz stat_reset_timestamp;
 } PgStat_WalStats;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 05070393b99..91d88e6dfb1 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2230,8 +2230,9 @@ pg_stat_wal| SELECT wal_records,
     wal_sync,
     wal_write_time,
     wal_sync_time,
+    wal_throttled,
     stats_reset
-   FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, stats_reset);
+   FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, wal_throttled, stats_reset);
 pg_stat_wal_receiver| SELECT pid,
     status,
     receive_start_lsn,
-- 
2.41.0

From 4a14b9edbcf79075c050cd60fc28a80c53452bc1 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Fri, 10 Nov 2023 17:37:41 +0100
Subject: [PATCH v5 2/2] rework

---
 src/backend/access/transam/xact.c   |   3 +
 src/backend/access/transam/xlog.c   | 338 +++++++++++++++++++++++++---
 src/backend/utils/misc/guc_tables.c |  28 ++-
 src/include/access/xlog.h           |   6 +-
 4 files changed, 345 insertions(+), 30 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 536edb3792f..9a333e8533e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2139,6 +2139,9 @@ StartTransaction(void)
 	 */
 	s->state = TRANS_INPROGRESS;
 
+	/* reset limits for WAL throttling */
+	ResetXLogThrottling();
+
 	ShowTransactionState("StartTransaction");
 }
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5dab46e17e1..b44e3c84d3d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -140,7 +140,16 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
-int			wal_throttle_threshold = 0;
+
+/* user-settable parameters for WAL throttling */
+int			wal_write_after = 0;
+int			wal_flush_after_local = 0;
+int			wal_flush_after_remote = 0;
+
+/* auto-adjusted version of the throttling parameters */
+int			wal_write_after_current = 0;
+int			wal_flush_after_local_current = 0;
+int			wal_flush_after_remote_current = 0;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -255,16 +264,10 @@ static int	LocalXLogInsertAllowed = -1;
  * parallel backends may have written WAL records at later LSNs than the value
  * stored here.  The parallel leader advances its own copy, when necessary,
  * in WaitForParallelWorkersToFinish.
- *
- * XactLastThrottledRecEnd points to the last XLOG record that should be throttled
- * as the additional WAL records could be generated before processing interrupts.
- *
- * XXX I'm not sure I understand what "record to be throttled" means?
  */
 XLogRecPtr	ProcLastRecPtr = InvalidXLogRecPtr;
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
 XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
-static XLogRecPtr	XactLastThrottledRecEnd = InvalidXLogRecPtr;
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -663,7 +666,8 @@ static MemoryContext walDebugCxt = NULL;
  *
  * XXX Not sure it should refer to "backend", it's really about xact, no?
  */
-uint32	backendWalInserted = 0;
+uint32		backendWalInserted = 0;
+XLogRecPtr	backendLastFlushPtr = InvalidXLogRecPtr;
 
 static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
 										XLogRecPtr EndOfLog,
@@ -1094,17 +1098,48 @@ XLogInsertRecord(XLogRecData *rdata,
 		/*
 		 * Decide if we need to throttle this backend, so that it does not write
 		 * WAL too fast, causing lag against the sync standby (which in turn
-		 * increases latency for standby confirmations). We may be holding locks
-		 * and blocking interrupts here, so we only make the decision, but the
-		 * wait (for sync standby confirmation) happens elsewhere.
+		 * increases latency both for local commits and for standby confirmations).
+		 * We may be holding locks and blocking interrupts here, so we only make
+		 * the decisions here, but the wait (for sync standby confirmation)
+		 * happens elsewhere in processing interrupts.
+		 *
+		 * This is somewhat similar to what walwriter does, except that walwriter
+		 * is driven by time (not by amount of WAL written), and does not throttle
+		 * the backends (in a way, the intention is to do exactly the opposite by
+		 * taking away the responsibility for writing WAL from backends).
+		 *
+		 * We may decide to do one or more of these things:
+		 *
+		 * - write the WAL locally (in the background, no throttle)
+		 * - flush the WAL locally (throttle, if synchronous_commit == local)
+		 * - wait for sync replica (throttle, if synchronous_commit != local)
+		 *
+		 * The first point (driven by wal_write_after) is meant to start writing
+		 * WAL to disk early, so that the subsequent flush is faster. This does
+		 * not do any flushes, it merely writes WAL data to page cache.
 		 *
-		 * The throttling is applied only to large transactions (producing more
-		 * than wal_throttle_threshold kilobytes of WAL). Throttled backends
-		 * can be identified by a new wait event SYNC_REP_THROTTLED.
+		 * The second point (driven by wal_flush_after_local) forces the backend
+		 * to actually flush WAL to disk, to reduce the amount of unflushed WAL.
+		 * This is a synchronous operation, and is applied only to transactions
+		 * that generate enough WAL (e.g. 256kB). The goal is to limit impact of
+		 * large transactions (producing a lot of WAL) on small OLTP ones.
+		 *
+		 * The third point (driven by wal_flush_after_remote) is similar, except
+		 * that it throttles based on confirmations from a sync replica.
+		 *
+		 * The throttling is applied only to large transactions, and the backends
+		 * affected by the throttling can be identified by a new wait event
+		 * SYNC_REP_THROTTLED.
 		 *
 		 * Small transactions (by amount of produced WAL) are still subject to
 		 * the sync replication, so the same wait happens at commit time.
 		 *
+		 * All of those thresholds are auto-tuned, i.e. the value is adjusted
+		 * based on concurrent activity. All the large transactions need to
+		 * share the WAL budgets, somehow. We do that by adjusting the value
+		 * by observing the distance since the last write/flush, and comparing
+		 * it to the current limit.
+		 *
 		 * XXX Not sure this is the right place for a comment explaining how the
 		 * throttling works. This place is way too low level, and rather far from
 		 * the place where the wait actually happens.
@@ -1112,14 +1147,47 @@ XLogInsertRecord(XLogRecData *rdata,
 		 * XXX Should this be done even if XLogDelayPending is already set? Maybe
 		 * that should only update XactLastThrottledRecEnd, withoug incrementing
 		 * the pgWalUsage.wal_throttled counter?
+		 *
+		 * XXX Maybe the different cases should have separate wait events?
+		 *
+		 * XXX Note that for the remote WAL, we need to also enforce the local
+		 * flush throttling, because we only send data to replica after it was
+		 * flushed locally.
+		 *
+		 * XXX I guess to make this work well, we should ensure
+		 * (wal_write_after < wal_flush_after_local < wal_flush_after_remove)
 		 */
 		backendWalInserted += rechdr->xl_tot_len;
 
-		if ((synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
-			(wal_throttle_threshold > 0) &&
-			(backendWalInserted >= wal_throttle_threshold * 1024L))
+		/*
+		 * Should we issue the WAL write to local disk (without a flush)?
+		 */
+		if ((wal_write_after_current > 0) &&
+			(backendWalInserted >= wal_write_after_current * 1024L))
+		{
+			InterruptPending = true;
+			XLogDelayPending = true;
+		}
+
+		/*
+		 * Should we flush the WAL to local disk?
+		 */
+		else if ((synchronous_commit >= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) &&
+			(wal_flush_after_local_current > 0) &&
+			(backendWalInserted >= wal_flush_after_local_current * 1024L))
+		{
+			InterruptPending = true;
+			XLogDelayPending = true;
+			pgWalUsage.wal_throttled++;
+		}
+
+		/*
+		 * Should we flush the WAL to sync replica?
+		 */
+		else if ((synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+			(wal_flush_after_remote_current > 0) &&
+			(backendWalInserted >= wal_flush_after_remote_current * 1024L))
 		{
-			XactLastThrottledRecEnd = XactLastRecEnd;
 			InterruptPending = true;
 			XLogDelayPending = true;
 			pgWalUsage.wal_throttled++;
@@ -2703,6 +2771,7 @@ XLogFlush(XLogRecPtr record)
 	 * XXX For the approximation we should probably use LogwrtResult.Flush.
 	 */
 	backendWalInserted = 0;
+	backendLastFlushPtr = record;
 
 	/* Quick exit if already known flushed */
 	if (record <= LogwrtResult.Flush)
@@ -9234,19 +9303,234 @@ SetWalWriterSleeping(bool sleeping)
  * XXX Not sure why we use XactLastThrottledRecEnd and not simply XLogRecEnd?
  */
 void
-HandleXLogDelayPending()
+HandleXLogDelayPending(void)
 {
-	XLogRecPtr 	lsn;
+	/* need to remember the values, because XLogFlush may reset them */
+	uint32		wal_inserted = backendWalInserted;
+	XLogRecPtr	last_flush_lsn = backendLastFlushPtr;
+	XLogRecPtr	flush_lsn = InvalidXLogRecPtr;
+
+	/* make sure we're not in recovery - timeline could change, etc. */
+	Assert(!InRecovery);
+
+	/*
+	 * XactLastRecEnd gets reset once in a while (e.g. after commit or abort),
+	 * and if such record just gets us over the throttling threshold, we'll see
+	 * 0 here. We just ignore the trottling request in that case - one might
+	 * argue the commit already does the throttling anyway (flush and wait
+	 * for sync replica), and it should be a fairly rare case (How likely is
+	 * it that a large transaction crosses the limit just with when writing
+	 * the commit record?).
+	 *
+	 * XXX An alternative would be to have a separate variable, storing the
+	 * XactLastRecEnd value we've seen while deciding to throttle (in fact an
+	 * earlier patch did that). But it does not seem worth it.
+	 */
+	if (XactLastRecEnd == InvalidXLogRecPtr)
+	{
+		XLogDelayPending = false;
+		return;
+	}
+
+	/*
+	 * read LogwrtResult and update local state
+	 *
+	 * XXX Maybe we should use the cached local LogwrtResult copy to determine if
+	 * we need to a write/flush? That wouldn't require a spinlock, and in the worst
+	 * case we'd see a bit stale values and perhaps issue a write/flush early.
+	 * However, we shouldn't to this throttling very often, so seems fine.
+	 */
+	SpinLockAcquire(&XLogCtl->info_lck);
+	LogwrtResult = XLogCtl->LogwrtResult;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * First decide if we need to issue a write to local WAL. We assume this LSN
+	 * will be ahead of LSN for either flush (local or remote), so let's do this
+	 * first.
+	 */
+	if ((wal_write_after_current > 0) &&
+		(wal_inserted >= wal_write_after_current * 1024L))
+	{
+		/*
+		 * We want to keep write lag below wal_write_after_current, but we don't
+		 * want to write partial pages (because of write amplification).
+		 */
+		XLogRecPtr 	lsn = XactLastRecEnd - wal_write_after_current * 1024L;
+
+		/* truncate to the last full page */
+		lsn = lsn - (lsn % XLOG_BLCKSZ);
+
+		/* Do the WAL write unless someone already issued the write earlier. */
+		if (lsn > LogwrtResult.Write)
+		{
+			START_CRIT_SECTION();
+
+			/*
+			 * Before actually performing the write, wait for all in-flight
+			 * insertions to the pages we're about to write to finish.
+			 */
+			lsn = WaitXLogInsertionsToFinish(lsn);
+
+			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+			LogwrtResult = XLogCtl->LogwrtResult;
+			if (LogwrtResult.Write >= lsn)
+			{
+				/* OK, someone wrote it already */
+				LWLockRelease(WALWriteLock);
+			}
+			else
+			{
+				XLogwrtRqst WriteRqst;
+
+				/* Have to write it ourselves */
+				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+				WriteRqst.Write = lsn;
+				WriteRqst.Flush = 0;
+				XLogWrite(WriteRqst, XLogCtl->InsertTimeLineID, false);
+				LWLockRelease(WALWriteLock);
+				PendingWalStats.wal_buffers_full++;		// FIXME
+				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+			}
+
+			END_CRIT_SECTION();
+		}
+	}
+
+	/* Now consider if we should do a local flush. */
+	if ((synchronous_commit >= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) &&
+		(wal_flush_after_local_current > 0) &&
+		(wal_inserted >= wal_flush_after_local_current * 1024L))
+	{
+		/* flush lag should be less than wal_flush_after_local_current (kB) */
+		XLogRecPtr 	lsn = XactLastRecEnd - wal_flush_after_local_current * 1024L;
+
+		/* truncate to the last full */
+		lsn = lsn - (lsn % XLOG_BLCKSZ);
+
+		/* do the flush (will reset backendWalInserted etc.) */
+		XLogFlush(lsn);
+
+		/* remember the LSN of this flush */
+		flush_lsn = lsn;
+	}
+
+	/*
+	 * Finally, decide if we should wait for confirmation from a sync replica.
+	 */
+	if ((synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+		(wal_flush_after_remote_current > 0) &&
+		(wal_inserted >= wal_flush_after_remote_current * 1024L))
+	{
+		/* sync flush lag should be less than wal_flush_after_remote_current (kB) */
+		XLogRecPtr 	lsn = XactLastRecEnd - wal_flush_after_remote_current * 1024L;
+
+		/* truncate to the last full page */
+		lsn = lsn - (lsn % XLOG_BLCKSZ);
+
+		SyncRepWaitForLSN(lsn, false);
+	}
+
+	/*
+	 * Now adjust the thresholds based on the distance since the last flush in this
+	 * backend, and the fraction of WAL this backend was responsible for.
+	 */
+	if (flush_lsn != InvalidXLogRecPtr)
+	{
+		/* Fraction of WAL between the flushes was produced by this backend. */
+		double wal_fraction = (double) wal_inserted / (flush_lsn - last_flush_lsn);
+
+		double	sum = 0;
+		int		cnt = 0;
+
+		/*
+		 * If the fraction is sufficiently far from 1.0 or close to it, we adjust
+		 * the limits to get close to the "ideal" value based on the fraction. We
+		 * don't immediately switch to the new value, but instead change the values
+		 * gradually by halving/doubling the previous value.
+		 *
+		 * The fraction is guaranteed to be less than 1.0, so we halve the limit
+		 * for low values below 0.5, double the limit for values above 0.9, and
+		 * do nothing for values in between.
+		 *
+		 * We calculate the desired values for each enabled limit (skip those set
+		 * to 0), and then use the average fraction to maintain the proportion.
+		 */
 
-	/* calculate last fully filled page */
-	lsn = XactLastThrottledRecEnd - (XactLastThrottledRecEnd % XLOG_BLCKSZ);
+		if (wal_fraction < 0.5)
+		{
+			wal_write_after_current /= 2;
+			wal_flush_after_local_current /= 2;
+			wal_flush_after_remote_current /= 2;
+		}
+		else if (wal_fraction > 0.9)
+		{
+			wal_write_after_current *= 2;
+			wal_flush_after_local_current *= 2;
+			wal_flush_after_remote_current *= 2;
+		}
+
+
+		/* clamp to correct range */
+
+		wal_write_after_current
+			= Min(wal_write_after, Max(1, wal_write_after_current));
+
+		wal_flush_after_local_current
+			= Min(wal_flush_after_local, Max(1, wal_flush_after_local_current));
+
+		wal_flush_after_remote_current
+			= Min(wal_flush_after_remote, Max(1, wal_flush_after_remote_current));
+
+
+		/* calculate the average fraction */
+
+		if (wal_write_after > 0)
+		{
+			sum += (double) wal_write_after_current / wal_write_after;
+			cnt += 1;
+		}
+
+		if (wal_flush_after_local > 0)
+		{
+			sum += (double) wal_flush_after_local_current / wal_flush_after_local;
+			cnt += 1;
+		}
+
+		if (wal_flush_after_remote > 0)
+		{
+			sum += (double) wal_flush_after_remote_current / wal_flush_after_remote;
+			cnt += 1;
+		}
+
+		Assert(cnt > 0);
+
+		/* apply the average fraction */
+
+		wal_write_after_current = wal_write_after * (sum / cnt);
+		wal_flush_after_local_current = wal_flush_after_local * (sum / cnt);
+		wal_flush_after_remote_current = wal_flush_after_remote * (sum / cnt);
 
-	Assert(wal_throttle_threshold > 0);
-	Assert(backendWalInserted >= wal_throttle_threshold * 1024L);
-	Assert(XactLastThrottledRecEnd != InvalidXLogRecPtr);
+		/* clamp to accepted ranges */
 
-	XLogFlush(lsn);
-	SyncRepWaitForLSN(lsn, false);
+		wal_write_after_current
+			= Min(wal_write_after, Max(1, wal_write_after_current));
+
+		wal_flush_after_local_current
+			= Min(wal_flush_after_local, Max(1, wal_flush_after_local_current));
+
+		wal_flush_after_remote_current
+			= Min(wal_flush_after_remote, Max(1, wal_flush_after_remote_current));
+	}
 
 	XLogDelayPending = false;
 }
+
+void
+ResetXLogThrottling(void)
+{
+	wal_write_after_current = wal_write_after;
+	wal_flush_after_local_current = wal_flush_after_local;
+	wal_flush_after_remote_current = wal_flush_after_remote;
+}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index b14610e0b6e..5f98de1f356 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2893,13 +2893,37 @@ struct config_int ConfigureNamesInt[] =
 	},
 
 	{
-		{"wal_throttle_after", PGC_USERSET, REPLICATION_SENDING,
+		{"wal_write_after", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates "
 						 " before waiting for sync standby, to limit the replication lag."),
 			NULL,
 			GUC_UNIT_KB
 		},
-		&wal_throttle_threshold,
+		&wal_write_after,
+		0, 0, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"wal_flush_after_local", PGC_USERSET, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates "
+						 " before waiting for sync standby, to limit the replication lag."),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&wal_flush_after_local,
+		0, 0, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"wal_flush_after_remote", PGC_USERSET, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates "
+						 " before waiting for sync standby, to limit the replication lag."),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&wal_flush_after_remote,
 		0, 0, MAX_KILOBYTES,
 		NULL, NULL, NULL
 	},
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9d1175edab2..cb7a15c9520 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -54,7 +54,9 @@ extern PGDLLIMPORT char *wal_consistency_checking_string;
 extern PGDLLIMPORT bool log_checkpoints;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
-extern PGDLLIMPORT int wal_throttle_threshold;
+extern PGDLLIMPORT int wal_write_after;
+extern PGDLLIMPORT int wal_flush_after_local;
+extern PGDLLIMPORT int wal_flush_after_remote;
 extern PGDLLIMPORT uint32 backendWalInserted;
 
 extern PGDLLIMPORT int CheckPointSegments;
@@ -253,6 +255,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 extern void HandleXLogDelayPending(void);
+extern void HandleXLogDelayPending(void);
+extern void ResetXLogThrottling(void);
 
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
-- 
2.41.0

Reply via email to