From f99fe4b164fcaa58675f832b1497d70b42c22c91 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Thu, 5 Sep 2024 15:31:38 +0530
Subject: [PATCH v1] Implements Clock-skew management between nodes.

This patch attempts to manage clock skew between nodes by
introducing three new GUCs:
a) max_logical_rep_clock_skew
b) max_logical_rep_clock_skew_action
c) max_logical_rep_clock_skew_wait

If the timestamp of the currently replayed transaction is in the future
compared to the current time on the subscriber and the difference is
larger than 'max_logical_rep_clock_skew', then the action configured
in 'max_logical_rep_clock_skew_action' is performed by the apply worker.

If user configures 'wait' in 'max_logical_rep_clock_skew_action', then
apply worker will wait during 'begin' of transaction to bring clock-skew
within permissible range of 'max_logical_rep_clock_skew'.

There could be cases where actual clock skew is large while the configured
'max_logical_rep_clock_skew' is small. Then the apply worker may have to
wait for a longer period to manage the clock skew. To control this
maximum wait time, a new GUC, 'max_logical_rep_clock_skew_wait' is
provided.  This allows the user to set a cap on how long the apply
worker should wait. If the computed wait time exceeds this value,
the apply worker will error out without waiting.
---
 .../replication/logical/applyparallelworker.c |  14 +-
 src/backend/replication/logical/worker.c      | 125 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  40 ++++++
 src/backend/utils/misc/postgresql.conf.sample |   9 +-
 src/include/replication/logicalworker.h       |  18 +++
 src/include/replication/worker_internal.h     |   2 +-
 src/include/utils/timestamp.h                 |   1 +
 src/tools/pgindent/typedefs.list              |   1 +
 9 files changed, 205 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index e7f7d4c5e4..eb68437654 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -312,6 +312,13 @@ pa_can_start(void)
 	if (!AllTablesyncsReady())
 		return false;
 
+	/*
+	 * Do not start a new parallel worker if user has configured max clock
+	 * skew, as we need the commit timestamp in the beginning.
+	 */
+	if ((max_logical_rep_clock_skew > LR_CLOCK_SKEW_DEFAULT))
+		return false;
+
 	return true;
 }
 
@@ -696,9 +703,14 @@ pa_process_spooled_messages_if_required(void)
 	}
 	else if (fileset_state == FS_READY)
 	{
+		/*
+		 * Currently we do not support starting parallel apply worker when
+		 * clock skew is configured, thus it is okay to pass 0 as
+		 * origin-timestamp here.
+		 */
 		apply_spooled_messages(&MyParallelShared->fileset,
 							   MyParallelShared->xid,
-							   InvalidXLogRecPtr);
+							   InvalidXLogRecPtr, 0);
 		pa_set_fileset_state(MyParallelShared, FS_EMPTY);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 925dff9cc4..454f07deb6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -318,6 +318,20 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool		InitializingApplyWorker = false;
 
+/*
+ * GUC support
+ */
+const struct config_enum_entry logical_rep_clock_skew_action_options[] = {
+	{"error", LR_CLOCK_SKEW_ACTION_ERROR, false},
+	{"wait", LR_CLOCK_SKEW_ACTION_WAIT, false},
+	{NULL, 0, false}
+};
+
+/* GUCs */
+int			max_logical_rep_clock_skew = LR_CLOCK_SKEW_DEFAULT;
+int			max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR;
+int			max_logical_rep_clock_skew_wait = 300;	/* 5 mins */
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -982,6 +996,95 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+/*
+ * Manage clock skew between nodes.
+ *
+ * It checks if the remote timestamp is ahead of the local clock
+ * and if the difference exceeds max_logical_rep_clock_skew, it performs
+ * the action specified by the max_logical_rep_clock_skew_action.
+ */
+static void
+manage_clock_skew(TimestampTz origin_timestamp)
+{
+	TimestampTz current;
+	TimestampTz delayUntil;
+	long		msecs;
+	int			rc;
+
+	/* nothing to do if no max clock skew configured */
+	if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT)
+		return;
+
+	current = GetCurrentTimestamp();
+
+	/*
+	 * If the timestamp of the currently replayed transaction is in the future
+	 * compared to the current time on the subscriber and the difference is
+	 * larger than max_logical_rep_clock_skew, then perform the action
+	 * specified by the max_logical_rep_clock_skew_action setting.
+	 */
+	if (origin_timestamp > current &&
+		TimestampDifferenceExceeds(current, origin_timestamp,
+								   max_logical_rep_clock_skew * 1000))
+	{
+		if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg_internal("clock skew exceeds max_logical_rep_clock_skew (%d seconds)",
+									 max_logical_rep_clock_skew)));
+
+		/* Perform the wait */
+		while (true)
+		{
+			delayUntil =
+				TimestampTzMinusSeconds(origin_timestamp,
+										max_logical_rep_clock_skew);
+
+			/* Exit without waiting if it's already past 'delayUntil' time */
+			msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
+													delayUntil);
+			if (msecs <= 0)
+				break;
+
+			/* The wait time should not exceed max_logical_rep_clock_skew_wait */
+			if (msecs > (max_logical_rep_clock_skew_wait * 1000L))
+				ereport(ERROR,
+						(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+						 errmsg_internal("clock skew wait time exceeds max_logical_rep_clock_skew_wait (%d seconds)",
+										 max_logical_rep_clock_skew_wait)));
+
+			elog(DEBUG2, "delaying apply for %ld milliseconds to manage clock skew",
+				 msecs);
+
+			/* Sleep until we are signaled or msecs have elapsed */
+			rc = WaitLatch(MyLatch,
+						   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+						   msecs,
+						   WAIT_EVENT_LOGICAL_CLOCK_SKEW);
+
+			/* Exit the loop if msecs have elapsed */
+			if (rc & WL_TIMEOUT)
+				break;
+
+			if (rc & WL_LATCH_SET)
+			{
+				ResetLatch(MyLatch);
+				CHECK_FOR_INTERRUPTS();
+			}
+
+			/*
+			 * This might change max_logical_rep_clock_skew and
+			 * max_logical_rep_clock_skew_wait.
+			 */
+			if (ConfigReloadPending)
+			{
+				ConfigReloadPending = false;
+				ProcessConfigFile(PGC_SIGHUP);
+			}
+		}
+	}
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -1003,6 +1106,9 @@ apply_handle_begin(StringInfo s)
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
+
+	/* Check if there is any clock skew and perform configured action */
+	manage_clock_skew(begin_data.committime);
 }
 
 /*
@@ -1060,6 +1166,9 @@ apply_handle_begin_prepare(StringInfo s)
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
+
+	/* Check if there is any clock skew and perform configured action */
+	manage_clock_skew(begin_data.prepare_time);
 }
 
 /*
@@ -1315,7 +1424,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
-								   prepare_data.xid, prepare_data.prepare_lsn);
+								   prepare_data.xid, prepare_data.prepare_lsn,
+								   prepare_data.prepare_time);
 
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
@@ -2020,7 +2130,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
  */
 void
 apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-					   XLogRecPtr lsn)
+					   XLogRecPtr lsn,
+					   TimestampTz origin_timestamp)
 {
 	int			nchanges;
 	char		path[MAXPGPATH];
@@ -2073,6 +2184,13 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 
 	end_replication_step();
 
+	/*
+	 * If origin_timestamp is provided by caller, then check clock skew with
+	 * respect to the passed time and take configured action.
+	 */
+	if (origin_timestamp)
+		manage_clock_skew(origin_timestamp);
+
 	/*
 	 * Read the entries one by one and pass them through the same logic as in
 	 * apply_dispatch.
@@ -2178,7 +2296,8 @@ apply_handle_stream_commit(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
-								   commit_data.commit_lsn);
+								   commit_data.commit_lsn,
+								   commit_data.committime);
 
 			apply_handle_commit_internal(&commit_data);
 
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 8efb4044d6..0ebad6fcab 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -59,6 +59,7 @@ CHECKPOINTER_MAIN	"Waiting in main loop of checkpointer process."
 LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
+LOGICAL_CLOCK_SKEW	"Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
 REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..c768a11963 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -68,6 +68,7 @@
 #include "postmaster/walsummarizer.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
@@ -482,6 +483,7 @@ extern const struct config_enum_entry archive_mode_options[];
 extern const struct config_enum_entry recovery_target_action_options[];
 extern const struct config_enum_entry wal_sync_method_options[];
 extern const struct config_enum_entry dynamic_shared_memory_options[];
+extern const struct config_enum_entry logical_rep_clock_skew_action_options[];
 
 /*
  * GUC option variables that are exported from this module
@@ -3714,6 +3716,33 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets maximum clock skew tolerance between logical "
+						 "replication nodes beyond which action configured "
+						 "in max_logical_rep_clock_skew_action is triggered."),
+			gettext_noop("-1 turns this check off."),
+			GUC_UNIT_S
+		},
+		&max_logical_rep_clock_skew,
+		LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"max_logical_rep_clock_skew_wait", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets max limit on how long apply worker shall wait to "
+						 "bring clock skew within permissible range of max_logical_rep_clock_skew. "
+						 "If the computed wait time is more than this value, "
+						 "apply worker will error out without waiting."),
+			gettext_noop("0 turns this limit off."),
+			GUC_UNIT_S
+		},
+		&max_logical_rep_clock_skew_wait,
+		300, 0, 3600,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
@@ -4991,6 +5020,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets the action to perform if a clock skew higher "
+						 "than max_logical_rep_clock_skew is detected."),
+			NULL
+		},
+		&max_logical_rep_clock_skew_action,
+		LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"track_functions", PGC_SUSET, STATS_CUMULATIVE,
 			gettext_noop("Collects function-level statistics on database activity."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..6424432362 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -383,7 +383,14 @@
 					# (change requires restart)
 #max_sync_workers_per_subscription = 2	# taken from max_logical_replication_workers
 #max_parallel_apply_workers_per_subscription = 2	# taken from max_logical_replication_workers
-
+#max_logical_rep_clock_skew = -1	# maximum clock skew tolerance between logical
+					# replication nodes beyond which action configured in
+					# 'max_logical_rep_clock_skew_action' is triggered.
+#max_logical_rep_clock_skew_action = error # error or wait
+					   # (change requires restart)
+#max_logical_rep_clock_skew_wait = 300 # max limit on how long apply worker
+					# shall wait to bring clock skew within permissible
+					# range of max_logical_rep_clock_skew.
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index a18d79d1b2..7cb03062ac 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,7 +14,25 @@
 
 #include <signal.h>
 
+/*
+ * The default for max_logical_rep_clock_skew is -1, which means ignore clock
+ * skew (the check is turned off).
+ */
+#define LR_CLOCK_SKEW_DEFAULT -1
+
+/*
+ * Worker Clock Skew Action.
+ */
+typedef enum
+{
+	LR_CLOCK_SKEW_ACTION_ERROR,
+	LR_CLOCK_SKEW_ACTION_WAIT,
+} LogicalRepClockSkewAction;
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
+extern PGDLLIMPORT int max_logical_rep_clock_skew;
+extern PGDLLIMPORT int max_logical_rep_clock_skew_action;
+extern PGDLLIMPORT int max_logical_rep_clock_skew_wait;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 9646261d7e..95b2a5286d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -268,7 +268,7 @@ extern void stream_stop_internal(TransactionId xid);
 
 /* Common streaming function to apply all the spooled messages */
 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-								   XLogRecPtr lsn);
+								   XLogRecPtr lsn, TimestampTz origin_timestamp);
 
 extern void apply_dispatch(StringInfo s);
 
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index a6ce03ed46..53b828d89d 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X)
 /* Macros for doing timestamp arithmetic without assuming timestamp's units */
 #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000))
 #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000))
+#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000))
 
 
 /* Set at postmaster start */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b6135f0347..5a4e86fccd 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1566,6 +1566,7 @@ LogicalOutputPluginWriterPrepareWrite
 LogicalOutputPluginWriterUpdateProgress
 LogicalOutputPluginWriterWrite
 LogicalRepBeginData
+LogicalRepClockSkewAction
 LogicalRepCommitData
 LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
-- 
2.34.1

