From 1969704830317946825ebd1b3c23f7fc80a3157d Mon Sep 17 00:00:00 2001
From: Shi Yu <shiy.fnst@fujitsu.com>
Date: Mon, 24 Oct 2022 15:52:14 +0800
Subject: [PATCH v1] Allow streaming every change without waiting till
 logical_decoding_work_mem.

Add a new GUC force_stream_mode, when it is set on, send the change to
output plugin immediately in streaming mode. Otherwise, send until
logical_decoding_work_mem is exceeded.
---
 doc/src/sgml/config.sgml                      | 16 ++++++++++
 .../replication/logical/reorderbuffer.c       | 31 ++++++++++++++-----
 src/backend/utils/misc/guc_tables.c           | 10 ++++++
 src/include/replication/reorderbuffer.h       |  1 +
 4 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index ff6fcd902a..874d1fc6bc 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11540,6 +11540,22 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-force-stream-mode" xreflabel="force_stream_mode">
+      <term><varname>force_stream_mode</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>force_stream_mode</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies wheather to force sending the changes to output plugin
+        immediately in streaming mode. If set to <literal>off</literal> (the
+        default), send until <varname>logical_decoding_work_mem</varname> is
+        exceeded.
+       </para>
+      </listitem>
+     </varlistentry>
+
     </variablelist>
   </sect1>
   <sect1 id="runtime-config-short">
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381f2d..848dd7e12f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -209,6 +209,12 @@ typedef struct ReorderBufferDiskChange
 int			logical_decoding_work_mem;
 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 
+/*
+ * Wheather to send the change to output plugin immediately in streaming mode.
+ * When it is off, wait until logical_decoding_work_mem is exceeded.
+ */
+bool		force_stream_mode;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -3527,20 +3533,29 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
 
-	/* bail out if we haven't exceeded the memory limit */
-	if (rb->size < logical_decoding_work_mem * 1024L)
+	/*
+	 * Stream the changes immediately if force_stream_mode is on and the output
+	 * plugin supports streaming. Otherwise wait until size exceeds
+	 * logical_decoding_work_mem.
+	 */
+	bool force_stream = (force_stream_mode && ReorderBufferCanStream(rb));
+
+	/* bail out if force_stream is false and we haven't exceeded the memory limit */
+	if (!force_stream && rb->size < logical_decoding_work_mem * 1024L)
 		return;
 
 	/*
-	 * Loop until we reach under the memory limit.  One might think that just
-	 * by evicting the largest (sub)transaction we will come under the memory
-	 * limit based on assumption that the selected transaction is at least as
-	 * large as the most recent change (which caused us to go over the memory
-	 * limit). However, that is not true because a user can reduce the
+	 * If force_stream is true, loop until there's no change. Otherwise, loop
+	 * until we reach under the memory limit. One might think that just by
+	 * evicting the largest (sub)transaction we will come under the memory limit
+	 * based on assumption that the selected transaction is at least as large as
+	 * the most recent change (which caused us to go over the memory limit).
+	 * However, that is not true because a user can reduce the
 	 * logical_decoding_work_mem to a smaller value before the most recent
 	 * change.
 	 */
-	while (rb->size >= logical_decoding_work_mem * 1024L)
+	while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) ||
+		   (force_stream && rb->size > 0))
 	{
 		/*
 		 * Pick the largest transaction (or subtransaction) and evict it from
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 1bf14eec66..015a88abb3 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -1247,6 +1247,16 @@ struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"force_stream_mode", PGC_USERSET, DEVELOPER_OPTIONS,
+			gettext_noop("Force sending the changes to output plugin immediately if streaming is supported, without waiting till logical_decoding_work_mem."),
+			NULL,
+			GUC_NOT_IN_SAMPLE
+		},
+		&force_stream_mode,
+		false,
+		NULL, NULL, NULL
+	},
 
 	{
 		{"log_duration", PGC_SUSET, LOGGING_WHAT,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc4f9..669dc3a40e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -18,6 +18,7 @@
 #include "utils/timestamp.h"
 
 extern PGDLLIMPORT int logical_decoding_work_mem;
+extern PGDLLIMPORT bool force_stream_mode;
 
 /* an individual tuple, stored in one chunk of memory */
 typedef struct ReorderBufferTupleBuf
-- 
2.24.0.windows.2

