From 1a5cd86c78d7ed83cefbe74f35ffff3db1f568a1 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 10 Dec 2021 14:41:30 +0900
Subject: [PATCH v12 3/3] Add ALTER SUBSCRIPTION ... SKIP to skip the
 transaction on subscriber nodes.

If incoming change violates any constraint, logical replication stops
until it's resolved. This commit introduces another way to skip the
transaction in question, other than manually updating the subscriber's
database or using pg_replication_origin_advance().

The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX),
updating pg_subscription.subskiplsn field, telling the apply worker to
skip the transaction. The apply worker skips all data modification changes
within the specified transaction.

After skipping the transaction the apply worker clears
pg_subscription.subskiplsn.

Author: Masahiko Sawada
Reviewed-by: Vignesh C, Greg Nancarrow, Takamichi Osumi, Haiying Tang, Hou Zhijie, Peter Eisentraut, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                 |  10 +
 doc/src/sgml/logical-replication.sgml      |  26 +-
 doc/src/sgml/ref/alter_subscription.sgml   |  43 +++
 src/backend/catalog/pg_subscription.c      |   1 +
 src/backend/catalog/system_views.sql       |   3 +-
 src/backend/commands/subscriptioncmds.c    |  70 +++++
 src/backend/parser/gram.y                  |   9 +
 src/backend/replication/logical/worker.c   | 336 ++++++++++++++++-----
 src/bin/pg_dump/pg_dump.c                  |   4 +
 src/bin/psql/describe.c                    |   8 +-
 src/bin/psql/tab-complete.c                |   5 +-
 src/include/catalog/pg_subscription.h      |   5 +
 src/include/nodes/parsenodes.h             |   3 +-
 src/test/regress/expected/subscription.out | 101 ++++---
 src/test/regress/sql/subscription.sql      |   6 +
 src/test/subscription/t/029_skip_xact.pl   | 182 +++++++++++
 16 files changed, 685 insertions(+), 127 deletions(-)
 create mode 100644 src/test/subscription/t/029_skip_xact.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 83987a9904..89be2b7682 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7769,6 +7769,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subskiplsn</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       Commit LSN of the transaction whose changes are to be skipped, if a valid
+       LSN; otherwise <literal>0/0</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 57272e641e..d34b4485f5 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -361,13 +361,25 @@ CONTEXT:  processing remote data during "INSERT" for replication target relation
 </screen>
    The LSN of the transaction that contains the change violating the constraint and
    the replication origin name can be found from those outputs (LSN 0/14C0378 and
-   replication origin <literal>pg_16395</literal> in the above case).  The transaction
-   can be skipped by calling the <link linkend="pg-replication-origin-advance">
-   <function>pg_replication_origin_advance()</function></link> function with
-   the <parameter>node_name</parameter> and the next LSN of the commit LSN
-   (i.e., 0/14C0379) from those outputs.  The current position of origins can be
-   seen in the <link linkend="view-pg-replication-origin-status">
-   <structname>pg_replication_origin_status</structname></link> system view.
+   replication origin <literal>pg_16395</literal> in the above case).
+  </para>
+
+  <para>
+   The resolution can be done by changing data or permissions on the subscriber so
+   that it does not conflict with incoming changes, by dropping the conflicting constraint
+   or unique index, or by writing a trigger on the subscriber to suppress or redirect
+   conflicting incoming changes, or as a last resort, by skipping the whole transaction.
+  </para>
+
+  <para>
+   The whole transaction can be skipped by using <command>ALTER SUBSCRIPTION ... SKIP</command>
+   with the commit LSN on the subscription.  Alternatively, the transaction can also be
+   skipped by calling the <link linkend="pg-replication-origin-advance">
+   <function>pg_replication_origin_advance()</function></link> function with the
+   <parameter>node_name</parameter> and the next LSN of the commit LSN
+   (i.e., 0/14C0379) from those outputs.  Please note that skipping the whole transaction
+   include skipping changes that might not violate any constraint.  This can easily make
+   the subscriber inconsistent.
   </para>
  </sect1>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0d6f064f58..f974511c1c 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 </synopsis>
@@ -210,6 +211,48 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )</literal></term>
+    <listitem>
+     <para>
+      Skips applying all changes of the specified remote transaction.  If incoming data
+      violates any constraints, logical replication will stop until it is
+      resolved.  The resolution can be done either by changing data on the
+      subscriber so that it doesn't conflict with incoming changes or by skipping
+      the whole transaction.  Using the <command>ALTER SUBSCRIPTION ... SKIP</command>
+      command, the logical replication worker skips all data modification changes
+      within the specified transaction, including changes that might not violate
+      the constraint, so, it should only be used as a last resort. This option has
+      no effect on the transactions that are already prepared by enabling
+      <literal>two_phase</literal> on subscriber.  After logical replication
+      successfully skips the transaction or commits non-empty transaction,
+      the LSN (stored in
+      <structname>pg_subscription</structname>.<structfield>subskiplsn</structfield>)
+      is cleared.  See <xref linkend="logical-replication-conflicts"/> for
+      the details of logical replication conflicts.
+     </para>
+
+     <para>
+      <replaceable>skip_option</replaceable> specifies options for this operation.
+      The supported option is:
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>lsn</literal> (<type>pg_lsn</type>)</term>
+        <listitem>
+         <para>
+          Specifies the commit LSN of the remote transaction whose changes are to be skipped
+          by the logical replication worker.  Skipping
+          individual subtransactions is not supported.  Setting <literal>NONE</literal>
+          resets the LSN.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">new_owner</replaceable></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd20..2139ebd0e0 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->skiplsn = subform->subskiplsn;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca5a9..673d0bc7ba 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+              substream, subtwophasestate, subskiplsn, subslotname, subsynccommit,
+              subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d24..cb80d41494 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -45,6 +45,7 @@
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/syscache.h"
 
 /*
@@ -61,6 +62,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_LSN					0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +84,8 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	XLogRecPtr	lsn;			/* InvalidXLogRecPtr for resetting purpose,
+								 * otherwise a valid LSN */
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -249,6 +253,35 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_LSN) &&
+				 strcmp(defel->defname, "lsn") == 0)
+		{
+			char	   *lsn_str = defGetString(defel);
+			XLogRecPtr	lsn;
+
+			if (IsSet(opts->specified_opts, SUBOPT_LSN))
+				errorConflictingDefElem(defel, pstate);
+
+			if (strcmp(lsn_str, "none") == 0)
+			{
+				/* Setting lsn = NONE is treated as resetting LSN */
+				lsn = InvalidXLogRecPtr;
+			}
+			else
+			{
+				/* Parse the argument as LSN */
+				lsn = DatumGetTransactionId(DirectFunctionCall1(pg_lsn_in,
+																CStringGetDatum(lsn_str)));
+
+				if (XLogRecPtrIsInvalid(lsn))
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid WAL location (LSN): %s", lsn_str)));
+			}
+
+			opts->specified_opts |= SUBOPT_LSN;
+			opts->lsn = lsn;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -464,6 +497,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
+	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1083,6 +1117,42 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SKIP:
+			{
+				parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
+
+				/* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
+				Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
+
+				/*
+				 * If the user set subskiplsn, we do a sanity check to make
+				 * sure that the specified LSN is a probable value.
+				 */
+				if (!XLogRecPtrIsInvalid(opts.lsn))
+				{
+					RepOriginId originid;
+					char		originname[NAMEDATALEN];
+					XLogRecPtr	remote_lsn;
+
+					snprintf(originname, sizeof(originname), "pg_%u", subid);
+					originid = replorigin_by_name(originname, false);
+					remote_lsn = replorigin_get_progress(originid, false);
+
+					/* Check the given LSN is at least a future LSN */
+					if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
+						ereport(ERROR,
+								(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+								 errmsg("skip WAL location (LSN) must be greater than origin LSN %X/%X",
+										LSN_FORMAT_ARGS(remote_lsn))));
+				}
+
+				values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
+				replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+				update_tuple = true;
+				break;
+			}
+
 		default:
 			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
 				 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a03b33b53b..0036c2f9e2 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9983,6 +9983,15 @@ AlterSubscriptionStmt:
 											(Node *)makeBoolean(false), @1));
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name SKIP definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_SKIP;
+					n->subname = $3;
+					n->options = $5;
+					$$ = (Node *)n;
+				}
 		;
 
 /*****************************************************************************
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a159561e31..91a7eaffe1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -189,6 +190,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/syscache.h"
@@ -261,6 +263,21 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
+/*
+ * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
+ * the subscription if the remote transaction's commit LSN matches the subskiplsn.
+ * Once we start skipping changes, we don't stop it until we skip all changes of
+ * the transaction even if pg_subscription is updated and MySubscription->skiplsn
+ * gets changed or reset during that.  Also, in streaming transaction cases, we
+ * don't skip receiving and spooling the changes, since we decide whether or not
+ * to skip applying the changes when starting to apply changes.  The subskiplsn is
+ * cleared after successfully skipping the transaction or applying non-empty
+ * transaction, where the later avoids the mistakenly specified subskiplsn from
+ * being left.
+ */
+static XLogRecPtr skip_xact_commit_lsn = InvalidXLogRecPtr;
+#define is_skipping_changes() (!XLogRecPtrIsInvalid(skip_xact_commit_lsn))
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -307,10 +324,13 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+static void apply_worker_post_transaction(bool empty_tx, XLogRecPtr origin_lsn,
+										  TimestampTz origin_ts);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
-static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
+static bool apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
@@ -336,6 +356,12 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
 /* Common streaming function to apply all the spooled messages */
 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(XLogRecPtr lsn);
+static void stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_ts);
+static void clear_subscription_skip_lsn(XLogRecPtr skiplsn, XLogRecPtr origin_lsn,
+										TimestampTz origin_ts);
+
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn,
@@ -797,6 +823,8 @@ apply_handle_begin(StringInfo s)
 
 	remote_final_lsn = begin_data.final_lsn;
 
+	maybe_start_skipping_changes(begin_data.final_lsn);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -811,6 +839,7 @@ static void
 apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
+	bool		committed;
 
 	logicalrep_read_commit(s, &commit_data);
 
@@ -821,13 +850,10 @@ apply_handle_commit(StringInfo s)
 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	apply_handle_commit_internal(&commit_data);
-
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(commit_data.end_lsn);
+	committed = apply_handle_commit_internal(&commit_data);
 
-	pgstat_report_activity(STATE_IDLE, NULL);
-	reset_apply_error_context_info();
+	apply_worker_post_transaction(committed, commit_data.end_lsn,
+								  commit_data.committime);
 }
 
 /*
@@ -850,6 +876,8 @@ apply_handle_begin_prepare(StringInfo s)
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
+	maybe_start_skipping_changes(begin_data.prepare_lsn);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -908,9 +936,9 @@ apply_handle_prepare(StringInfo s)
 
 	/*
 	 * Unlike commit, here, we always prepare the transaction even though no
-	 * change has happened in this transaction. It is done this way because at
-	 * commit prepared time, we won't know whether we have skipped preparing a
-	 * transaction because of no change.
+	 * change has happened in this transaction or all changes are skipped. It
+	 * is done this way because at commit prepared time, we won't know whether
+	 * we have skipped preparing a transaction because of no change.
 	 *
 	 * XXX, We can optimize such that at commit prepared time, we first check
 	 * whether we have prepared the transaction or not but that doesn't seem
@@ -924,15 +952,14 @@ apply_handle_prepare(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn);
-
-	in_remote_transaction = false;
-
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
-
-	pgstat_report_activity(STATE_IDLE, NULL);
-	reset_apply_error_context_info();
+	/*
+	 * Do the post transaction work and cleanup.  Since we already have
+	 * prepared the transaction, in a case where the server crashes before
+	 * clearing the subskiplsn, it will be left but the transaction won't be
+	 * resent.  But that's okay because it will be cleared when starting to
+	 * apply the next transaction.
+	 */
+	apply_worker_post_transaction(false, prepare_data.end_lsn, prepare_data.prepare_time);
 }
 
 /*
@@ -965,16 +992,8 @@ apply_handle_commit_prepared(StringInfo s)
 	FinishPreparedTransaction(gid, true);
 	end_replication_step();
 	CommitTransactionCommand();
-	pgstat_report_stat(false);
-
-	store_flush_position(prepare_data.end_lsn);
-	in_remote_transaction = false;
-
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
 
-	pgstat_report_activity(STATE_IDLE, NULL);
-	reset_apply_error_context_info();
+	apply_worker_post_transaction(true, prepare_data.end_lsn, prepare_data.commit_time);
 }
 
 /*
@@ -985,6 +1004,7 @@ apply_handle_rollback_prepared(StringInfo s)
 {
 	LogicalRepRollbackPreparedTxnData rollback_data;
 	char		gid[GIDSIZE];
+	bool		finish_prepared = false;
 
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn,
@@ -1015,18 +1035,12 @@ apply_handle_rollback_prepared(StringInfo s)
 		FinishPreparedTransaction(gid, false);
 		end_replication_step();
 		CommitTransactionCommand();
+		finish_prepared = true;
 	}
 
-	pgstat_report_stat(false);
-
-	store_flush_position(rollback_data.rollback_end_lsn);
-	in_remote_transaction = false;
-
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(rollback_data.rollback_end_lsn);
-
-	pgstat_report_activity(STATE_IDLE, NULL);
-	reset_apply_error_context_info();
+	apply_worker_post_transaction(finish_prepared,
+								  rollback_data.rollback_end_lsn,
+								  rollback_data.rollback_time);
 }
 
 /*
@@ -1066,21 +1080,10 @@ apply_handle_stream_prepare(StringInfo s)
 
 	CommitTransactionCommand();
 
-	pgstat_report_stat(false);
-
-	store_flush_position(prepare_data.end_lsn);
-
-	in_remote_transaction = false;
-
 	/* unlink the files with serialized changes and subxact info. */
 	stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
 
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
-
-	pgstat_report_activity(STATE_IDLE, NULL);
-
-	reset_apply_error_context_info();
+	apply_worker_post_transaction(false, prepare_data.end_lsn, prepare_data.prepare_time);
 }
 
 /*
@@ -1341,6 +1344,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 
 	remote_final_lsn = lsn;
 
+	maybe_start_skipping_changes(lsn);
+
 	/*
 	 * Make sure the handle apply_dispatch methods are aware we're in a remote
 	 * transaction.
@@ -1429,6 +1434,7 @@ apply_handle_stream_commit(StringInfo s)
 {
 	TransactionId xid;
 	LogicalRepCommitData commit_data;
+	bool		committed;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1442,23 +1448,20 @@ apply_handle_stream_commit(StringInfo s)
 
 	apply_spooled_messages(xid, commit_data.commit_lsn);
 
-	apply_handle_commit_internal(&commit_data);
+	committed = apply_handle_commit_internal(&commit_data);
 
 	/* unlink the files with serialized changes and subxact info */
 	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 
-	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(commit_data.end_lsn);
-
-	pgstat_report_activity(STATE_IDLE, NULL);
-
-	reset_apply_error_context_info();
+	apply_worker_post_transaction(committed, commit_data.end_lsn,
+								  commit_data.committime);
 }
 
 /*
  * Helper function for apply_handle_commit and apply_handle_stream_commit.
+ * Return true if the transaction was committed, otherwise return false.
  */
-static void
+static bool
 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 {
 	if (IsTransactionState())
@@ -1471,18 +1474,14 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		replorigin_session_origin_timestamp = commit_data->committime;
 
 		CommitTransactionCommand();
-		pgstat_report_stat(false);
-
-		store_flush_position(commit_data->end_lsn);
-	}
-	else
-	{
-		/* Process any invalidation messages that might have accumulated. */
-		AcceptInvalidationMessages();
-		maybe_reread_subscription();
+		return true;
 	}
 
-	in_remote_transaction = false;
+	/* Process any invalidation messages that might have accumulated. */
+	AcceptInvalidationMessages();
+	maybe_reread_subscription();
+
+	return false;
 }
 
 /*
@@ -2376,6 +2375,17 @@ apply_dispatch(StringInfo s)
 	LogicalRepMsgType action = pq_getmsgbyte(s);
 	LogicalRepMsgType saved_command;
 
+	/*
+	 * Skip all data-modification changes if we're skipping changes of this
+	 * transaction.
+	 */
+	if (is_skipping_changes() &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE))
+		return;
+
 	/*
 	 * Set the current command being applied. Since this function can be
 	 * called recursively when applying spooled changes, save the current
@@ -3672,6 +3682,196 @@ IsLogicalWorker(void)
 	return MyLogicalRepWorker != NULL;
 }
 
+/*
+ * Post-transaction work for apply workers.
+ *
+ * tx_finished is true if the caller have finished the transaction with updating
+ * the replication origin so the same transaction won't be resent in case of
+ * a crash.  Both origin_lsn and origin_timestamp are the remote transaction's
+ * end_lsn and commit timestamp, respectively.
+ */
+static void
+apply_worker_post_transaction(bool tx_finished, XLogRecPtr origin_lsn,
+							  TimestampTz origin_ts)
+{
+	if (unlikely(is_skipping_changes()))
+	{
+		/*
+		 * If we are skipping all changes of this transaction, we stop it and
+		 * clear subskiplsn of pg_subscription.
+		 */
+		stop_skipping_changes(origin_lsn, origin_ts);
+	}
+	else if (unlikely(tx_finished && !XLogRecPtrIsInvalid(MySubscription->skiplsn)))
+	{
+		/*
+		 * The subskiplsn was specified but we successfully finished non-empty
+		 * transaction. In this case, it's possible that the user mistakenly
+		 * specified the wrong subskiplsn so raise an warning and clear it.
+		 */
+		ereport(WARNING,
+				errmsg("remote transaction's commit WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
+					   LSN_FORMAT_ARGS(origin_lsn),
+					   LSN_FORMAT_ARGS(MySubscription->skiplsn)));
+
+		clear_subscription_skip_lsn(MySubscription->skiplsn, origin_lsn, origin_ts);
+	}
+
+	Assert(!IsTransactionState());
+	Assert(!is_skipping_changes());
+
+	pgstat_report_stat(false);
+
+	store_flush_position(origin_lsn);
+
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(origin_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+
+	reset_apply_error_context_info();
+}
+
+/*
+ * Start skipping changes of the transaction if the given commit LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void
+maybe_start_skipping_changes(XLogRecPtr lsn)
+{
+	Assert(!is_skipping_changes());
+	Assert(!in_remote_transaction);
+	Assert(!in_streamed_transaction);
+
+	if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn)))
+		return;
+
+	if (likely(MySubscription->skiplsn != lsn))
+	{
+		/*
+		 * It's a rare case; a past subskiplsn was left because the server
+		 * crashed after preparing the transaction and before clearing the
+		 * subskiplsn. We clear it without a warning message so as not confuse
+		 * the user.
+		 */
+		if (unlikely(MySubscription->skiplsn < lsn))
+			clear_subscription_skip_lsn(MySubscription->skiplsn, InvalidXLogRecPtr, 0);
+
+		return;
+	}
+
+	/* Start skipping all changes of this transaction */
+	skip_xact_commit_lsn = lsn;
+
+	ereport(LOG,
+			errmsg("start skipping logical replication transaction which committed at %X/%X",
+				   LSN_FORMAT_ARGS(skip_xact_commit_lsn)));
+}
+
+/*
+ * Stop skipping changes by resetting skip_xact_commit_lsn.  Both origin_lsn and
+ * origin_timestamp are used to update origin state when clearing subskiplsn so
+ * that we can restart streaming from correct position in case of crash.
+ */
+static void
+stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_ts)
+{
+	Assert(is_skipping_changes());
+
+	clear_subscription_skip_lsn(skip_xact_commit_lsn, origin_lsn, origin_ts);
+
+	/* Make sure that clearing subskiplsn is committed */
+	if (IsTransactionState())
+		CommitTransactionCommand();
+
+	ereport(LOG,
+			(errmsg("done skipping logical replication transaction which committed at %X/%X",
+					LSN_FORMAT_ARGS(skip_xact_commit_lsn))));
+
+	/* Stop skipping changes */
+	skip_xact_commit_lsn = InvalidXLogRecPtr;
+}
+
+/* Clear subskiplsn of pg_subscription catalog */
+static void
+clear_subscription_skip_lsn(XLogRecPtr skiplsn, XLogRecPtr origin_lsn, TimestampTz origin_ts)
+{
+	Relation	rel;
+	Form_pg_subscription subform;
+	HeapTuple	tup;
+	bool		started_tx = false;
+
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
+
+	/*
+	 * Protect subskiplsn of pg_subscription from being concurrently updated
+	 * while clearing it.
+	 */
+	LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+					 AccessShareLock);
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  ObjectIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+	/*
+	 * Update the subskiplsn of the tuple to InvalidXLogRecPtr.  If user has
+	 * already changed subskiplsn before clearing it we don't update the
+	 * catalog and don't advance the replication origin state.  So in the
+	 * worst case, if the server crashes before sending an acknowledgment of
+	 * the flush position the transaction will be sent again and the user
+	 * needs to set subskiplsn again.  We can reduce the possibility by
+	 * logging a replication origin WAL record to advance the origin LSN
+	 * instead but there is no way to advance the origin timestamp and it
+	 * doesn't seem to be worth doing anything about it since it's a very rare
+	 * case.
+	 */
+	if (subform->subskiplsn == skiplsn)
+	{
+		bool		nulls[Natts_pg_subscription];
+		bool		replaces[Natts_pg_subscription];
+		Datum		values[Natts_pg_subscription];
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, false, sizeof(nulls));
+		memset(replaces, false, sizeof(replaces));
+
+		/* reset subskiplsn */
+		values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+		replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = origin_lsn;
+		replorigin_session_origin_timestamp = origin_ts;
+
+		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+								replaces);
+		CatalogTupleUpdate(rel, &tup->t_self, tup);
+	}
+
+	heap_freetuple(tup);
+	table_close(rel, NoLock);
+
+	if (started_tx)
+		CommitTransactionCommand();
+}
+
 /* Error callback to give more context info about the change being applied */
 static void
 apply_error_callback(void *arg)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e69dcf8a48..dc3b28660d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4355,6 +4355,10 @@ getSubscriptions(Archive *fout)
 
 	ntups = PQntuples(res);
 
+	/*
+	 * Get subscription fields. We don't include subskiplsn in the dump as
+	 * after restoring the dump this value may no longer be relevant.
+	 */
 	i_tableoid = PQfnumber(res, "tableoid");
 	i_oid = PQfnumber(res, "oid");
 	i_subname = PQfnumber(res, "subname");
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index e3382933d9..1750b71a4a 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false};
+	false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6129,6 +6129,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 						  ",  subconninfo AS \"%s\"\n",
 						  gettext_noop("Synchronous commit"),
 						  gettext_noop("Conninfo"));
+
+		/* Skip LSN is only supported in v15 and higher */
+		if (pset.sversion >= 150000)
+			appendPQExpBuffer(&buf,
+							  ", subskiplsn AS \"%s\"\n",
+							  gettext_noop("Skip LSN"));
 	}
 
 	/* Only display subscriptions in current database. */
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6957567264..604047f341 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1819,7 +1819,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
 		COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
-					  "RENAME TO", "REFRESH PUBLICATION", "SET",
+					  "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (",
 					  "ADD PUBLICATION", "DROP PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
@@ -1835,6 +1835,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+	/* ALTER SUBSCRIPTION <name> SKIP ( */
+	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
+		COMPLETE_WITH("lsn");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c291289f..89a5861d19 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
+	XLogRecPtr	subskiplsn;		/* All changes which committed at this LSN are
+								 * skipped */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,8 @@ typedef struct Subscription
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	XLogRecPtr	skiplsn;		/* All changes which committed at this LSN are
+								 * skipped */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1617702d9d..6f83a79a96 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3726,7 +3726,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
 	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
-	ALTER_SUBSCRIPTION_ENABLED
+	ALTER_SUBSCRIPTION_ENABLED,
+	ALTER_SUBSCRIPTION_SKIP
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562..4710d53698 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -93,11 +93,16 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2
 ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -129,10 +134,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
+                                                                                 List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +170,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +193,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +220,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +238,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +275,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +287,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +299,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af1e4..753be1f323 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -72,6 +72,12 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
 ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+
 \dRs+
 
 BEGIN;
diff --git a/src/test/subscription/t/029_skip_xact.pl b/src/test/subscription/t/029_skip_xact.pl
new file mode 100644
index 0000000000..833088cf86
--- /dev/null
+++ b/src/test/subscription/t/029_skip_xact.pl
@@ -0,0 +1,182 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Tests for skipping logical replication transactions
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 4;
+use Time::HiRes qw(usleep);
+
+my $offset = 0;
+
+# Test skipping the transaction. This function must be called after the caller
+# has inserted data that conflicts with the subscriber.  The commit-LSN of the
+# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
+# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we
+# check if logical replication can continue working by inserting $nonconflict_data
+# on the publisher.
+sub test_skip_xact
+{
+	my ($node_publisher, $node_subscriber, $subname, $relname,
+		$nonconflict_data, $expected, $msg)
+	  = @_;
+
+	# Wait until a conflict occurs on the subscriber.
+	$node_subscriber->wait_for_log(
+		qr/CONTEXT:  processing remote data during "INSERT" for replication target relation/,
+		$offset);
+
+	# Get the commit-LSN of the error transaction.
+	my $contents = slurp_file($node_subscriber->logfile, $offset);
+	$contents =~
+	  qr/processing remote data during "INSERT" for replication target relation "public.$relname" in transaction \d+ committed at LSN ([[:xdigit:]]+\/[[:xdigit:]]+)/
+	  or die "could not get error-LSN";
+	my $lsn = $1;
+
+	# Set skip lsn
+	$node_subscriber->safe_psql('postgres',
+		"ALTER SUBSCRIPTION $subname SKIP (lsn = '$lsn')");
+
+	# Restart the subscriber node to restart logical replication with no interval
+	$node_subscriber->restart;
+
+	# Wait for the failed transaction to be skipped
+	$node_subscriber->poll_query_until('postgres',
+		"SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = '$subname'"
+	);
+
+	# Wait for the log indicating that successfully skipped the transaction, and
+	# advance the offset of the log file for the next test.
+	$offset = $node_subscriber->wait_for_log(
+		qr/LOG:  done skipping logical replication transaction which committed at $lsn/,
+		$offset);
+
+	# Insert non-conflict data
+	$node_publisher->safe_psql('postgres',
+		"INSERT INTO $relname VALUES $nonconflict_data");
+
+	$node_publisher->wait_for_catchup($subname);
+
+	# Check replicated data
+	my $res = $node_subscriber->safe_psql('postgres',
+		"SELECT count(*) FROM $relname");
+	is($res, $expected, $msg);
+}
+
+# Create publisher node. Set a low value to logical_decoding_work_mem
+# so we can test streaming cases easily.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+	'postgresql.conf',
+	qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+	'postgresql.conf',
+	qq[
+max_prepared_transactions = 10
+]);
+
+# The subscriber will enter an infinite error loop, so we don't want
+# to overflow the server log with error messages.
+$node_subscriber->append_conf(
+	'postgresql.conf',
+	qq[
+wal_retrieve_retry_interval = 2s
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On the subscriber, we
+# create the same tables but with primary keys. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+CREATE TABLE test_tab (a int);
+CREATE TABLE test_tab_streaming (a int, b text);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+CREATE TABLE test_tab (a int primary key);
+CREATE TABLE test_tab_streaming (a int primary key, b text);
+INSERT INTO test_tab VALUES (1);
+INSERT INTO test_tab_streaming VALUES (1, md5(1::text));
+COMMIT;
+]);
+
+# Setup publications
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+CREATE PUBLICATION tap_pub FOR TABLE test_tab;
+CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming;
+]);
+
+# Create subscriptions
+$node_subscriber->safe_psql(
+	'postgres',
+	qq[
+CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (two_phase = on);
+CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr' PUBLICATION tap_pub_streaming WITH (streaming = on);
+]);
+
+$node_publisher->wait_for_catchup('tap_sub');
+$node_publisher->wait_for_catchup('tap_sub_streaming');
+
+# Insert data to test_tab1, raising an error on the subscriber due to violation
+# of the unique constraint on test_tab. Then skip the transaction.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+COMMIT;
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab",
+	"(2)", "2", "test skipping transaction");
+
+# Test for PREPARE and COMMIT PREPARED. Insert the same data to test_tab1 and
+# PREPARE the transaction, raising an error. Then skip the transaction.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab",
+	"(3)", "3", "test skipping prepare and commit prepared ");
+
+# Test for STREAM COMMIT. Insert enough rows to test_tab_streaming to exceed the 64kB
+# limit, also raising an error on the subscriber during applying spooled changes for the
+# same reason. Then skip the transaction.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+COMMIT;
+]);
+test_skip_xact($node_publisher, $node_subscriber, "tap_sub_streaming",
+	"test_tab_streaming", "(2, md5(2::text))",
+	"2", "test skipping stream-commit");
+
+my $res = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts");
+is($res, "0",
+	"check all prepared transactions are resolved on the subscriber");
-- 
2.24.3 (Apple Git-128)

