From 0aff0fb1bf3bb6c3bdf73bc32fb8735e0ed427ee Mon Sep 17 00:00:00 2001
From: Jakub Wartak <jakub.wartak@enterprisedb.com>
Date: Tue, 24 Jan 2023 09:45:14 +0000
Subject: [PATCH] GUC synchronous_commit_flush_wal_after

---
 src/backend/access/transam/xlog.c   | 19 +++++++++++++++++++
 src/backend/utils/misc/guc_tables.c | 12 ++++++++++++
 src/include/access/xlog.h           |  1 +
 3 files changed, 32 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a31fbbff78..d482f58f2c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -82,6 +82,7 @@
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -138,6 +139,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int		synchronous_commit_flush_wal_after = 0; /* kb */
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -638,6 +640,8 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+static uint32	backendWalInserted = 0;
+
 static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
 										XLogRecPtr EndOfLog,
 										TimeLineID newTLI);
@@ -1021,6 +1025,21 @@ XLogInsertRecord(XLogRecData *rdata,
 		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
 		pgWalUsage.wal_records++;
 		pgWalUsage.wal_fpi += num_fpi;
+
+		backendWalInserted += rechdr->xl_tot_len;
+
+		if ((synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_APPLY || synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+			synchronous_commit_flush_wal_after > 0 &&
+			backendWalInserted > synchronous_commit_flush_wal_after * 1024L)
+		{
+			elog(DEBUG3, "throttling WAL down on this session (backendWalInserted=%d)", backendWalInserted);
+			XLogFlush(EndPos);
+			/* XXX: refactor SyncRepWaitForLSN() to have different waitevent than default WAIT_EVENT_SYNC_REP  */
+			/* maybe new WAIT_EVENT_SYNC_REP_BIG or something like that */
+			SyncRepWaitForLSN(EndPos, false);
+			elog(DEBUG3, "throttling WAL down on this session - end");
+			backendWalInserted = 0;
+		}
 	}
 
 	return EndPos;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 1bf14eec66..9678a6f66e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2799,6 +2799,17 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"synchronous_commit_flush_wal_after", PGC_USERSET, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum logged WAL in kbytes, after which wait for sync commit confiration even without commit "),
+			NULL
+			GUC_UNIT_KB
+		},
+		&synchronous_commit_flush_wal_after,
+		0, 0, 1024L*1024L,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"extra_float_digits", PGC_USERSET, CLIENT_CONN_LOCALE,
 			gettext_noop("Sets the number of digits displayed for floating-point values."),
@@ -2945,6 +2956,7 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+
 	{
 		{"max_worker_processes",
 			PGC_POSTMASTER,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1fbd48fbda..d17a948662 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -51,6 +51,7 @@ extern PGDLLIMPORT char *wal_consistency_checking_string;
 extern PGDLLIMPORT bool log_checkpoints;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int synchronous_commit_flush_wal_after;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
-- 
2.30.2

