From 06fa715a6d754f5b99f6b94811f65b4f42aed60e Mon Sep 17 00:00:00 2001
From: Shubham Khanna <khannashubham1197@gmail.com>
Date: Mon, 20 May 2024 10:58:31 +0530
Subject: [PATCH v5 1/3] Enable support for 'include_generated_columns' option
 in 'logical replication'

This commit enables support for the 'include_generated_columns' option in
logical replication, allowing the transmission of generated column information
and data alongside regular table changes. This option is particularly useful
for scenarios where applications require access to generated column values for
downstream processing or synchronization.

With this enhancement, users can now include the 'include_generated_columns'
option when querying logical replication slots using either the pgoutput plugin
or the test_decoding plugin. This option, when set to 'true' or '1',
instructs the replication system to include generated column information
and data in the replication stream.

CREATE SUBSCRIPTION test1 connection 'dbname=postgres host=localhost port=9999
'publication pub1;

Usage from test_decoding plugin:
SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL,
                                             'include-xids', '0', 'skip-empty-xacts', '1',
	                                     'include_generated_columns', '1');

Currently copy_data option with include_generated_columns option is not supported.
A future patch will remove this limitation.

This commit aims to enhance the flexibility and utility of logical
replication by allowing users to include generated column information in
replication streams, paving the way for more robust data synchronization and
processing workflows.
---
 .../expected/decoding_into_rel.out            | 25 +++++++++
 .../test_decoding/sql/decoding_into_rel.sql   | 10 +++-
 contrib/test_decoding/test_decoding.c         | 26 +++++++--
 doc/src/sgml/protocol.sgml                    | 14 +++++
 doc/src/sgml/ref/create_subscription.sgml     | 19 +++++++
 src/backend/catalog/pg_publication.c          |  9 +--
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/commands/subscriptioncmds.c       | 31 +++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 ++
 src/backend/replication/logical/proto.c       | 56 +++++++++++++------
 src/backend/replication/logical/relation.c    |  2 +-
 src/backend/replication/logical/worker.c      |  1 +
 src/backend/replication/pgoutput/pgoutput.c   | 42 ++++++++++----
 src/bin/psql/tab-complete.c                   |  2 +-
 src/include/catalog/pg_subscription.h         |  3 +
 src/include/replication/logicalproto.h        | 13 +++--
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/regress/expected/publication.out     |  4 +-
 src/test/regress/sql/publication.sql          |  3 +-
 src/test/subscription/t/011_generated.pl      | 33 ++++++++++-
 src/test/subscription/t/031_column_list.pl    |  4 +-
 22 files changed, 249 insertions(+), 55 deletions(-)

diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out
index 8fd3390066..d4116b0fe6 100644
--- a/contrib/test_decoding/expected/decoding_into_rel.out
+++ b/contrib/test_decoding/expected/decoding_into_rel.out
@@ -103,6 +103,31 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (14 rows)
 
+-- check include-generated-columns option with generated column
+CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED);
+INSERT INTO gencoltable (a) VALUES (1), (2), (3);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '1');
+                            data                             
+-------------------------------------------------------------
+ BEGIN
+ table public.gencoltable: INSERT: a[integer]:1 b[integer]:2
+ table public.gencoltable: INSERT: a[integer]:2 b[integer]:4
+ table public.gencoltable: INSERT: a[integer]:3 b[integer]:6
+ COMMIT
+(5 rows)
+
+INSERT INTO gencoltable (a) VALUES (4), (5), (6);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '0');
+                      data                      
+------------------------------------------------
+ BEGIN
+ table public.gencoltable: INSERT: a[integer]:4
+ table public.gencoltable: INSERT: a[integer]:5
+ table public.gencoltable: INSERT: a[integer]:6
+ COMMIT
+(5 rows)
+
+DROP TABLE gencoltable;
 SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
  ?column? 
 ----------
diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql
index 1068cec588..c40b860f11 100644
--- a/contrib/test_decoding/sql/decoding_into_rel.sql
+++ b/contrib/test_decoding/sql/decoding_into_rel.sql
@@ -39,4 +39,12 @@ SELECT * FROM slot_changes_wrapper('regression_slot');
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
-SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
+-- check include-generated-columns option with generated column
+CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED);
+INSERT INTO gencoltable (a) VALUES (1), (2), (3);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '1');
+INSERT INTO gencoltable (a) VALUES (4), (5), (6);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '0');
+DROP TABLE gencoltable;
+
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
\ No newline at end of file
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 7c50d13969..10ca369d2a 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -31,6 +31,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		include_generated_columns;
 } TestDecodingData;
 
 /*
@@ -168,6 +169,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_generated_columns = true;
 
 	ctx->output_plugin_private = data;
 
@@ -259,6 +261,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-generated-columns") == 0)
+		{
+			if (elem->arg == NULL)
+				data->include_generated_columns = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_generated_columns))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -521,7 +533,8 @@ print_literal(StringInfo s, Oid typid, char *outputstr)
 
 /* print the tuple 'tuple' into the StringInfo s */
 static void
-tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
+tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple,
+					bool skip_nulls, bool include_generated_columns)
 {
 	int			natt;
 
@@ -544,6 +557,9 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 		if (attr->attisdropped)
 			continue;
 
+		if (attr->attgenerated && !include_generated_columns)
+			continue;
+
 		/*
 		 * Don't print system columns, oid will already have been printed if
 		 * present.
@@ -641,7 +657,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.newtuple,
-									false);
+									false, data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			appendStringInfoString(ctx->out, " UPDATE:");
@@ -650,7 +666,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				appendStringInfoString(ctx->out, " old-key:");
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.oldtuple,
-									true);
+									true, data->include_generated_columns );
 				appendStringInfoString(ctx->out, " new-tuple:");
 			}
 
@@ -659,7 +675,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.newtuple,
-									false);
+									false, data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			appendStringInfoString(ctx->out, " DELETE:");
@@ -671,7 +687,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.oldtuple,
-									true);
+									true, data->include_generated_columns);
 			break;
 		default:
 			Assert(false);
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 1b27d0a547..e6fee105de 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -3306,6 +3306,20 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><replaceable class="parameter">include-generated-columns</replaceable></term>
+      <listitem>
+       <para>
+        Boolean option to enable generated columns.
+        The include-generated-columns option controls whether generated
+        columns should be included in the string representation of tuples
+        during logical decoding in PostgreSQL. This allows users to
+        customize the output format based on whether they want to include
+        these columns or not. The default is false.
+       </para>
+      </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term>
       origin
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 740b7d9421..57520b5aef 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -428,6 +428,25 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry id="sql-createsubscription-params-with-include-generated-column">
+        <term><literal>include_generated_column</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the generated columns present in the tables
+          associated with the subscription should be replicated. The default is
+          <literal>false</literal>.
+         </para>
+
+         <para>
+          This parameter can only be set true if <literal>copy_data</literal> is
+          set to <literal>false</literal>. If the subscriber-side column is also a
+          generated column then this option has no effect; the replicated data will
+          be ignored and the subscriber column will be filled as normal with the
+          subscriber-side computed or default data.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 0602398a54..f611148472 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -506,7 +506,6 @@ publication_translate_columns(Relation targetrel, List *columns,
 	Bitmapset  *set = NULL;
 	ListCell   *lc;
 	int			n = 0;
-	TupleDesc	tupdesc = RelationGetDescr(targetrel);
 
 	/* Bail out when no column list defined. */
 	if (!columns)
@@ -534,12 +533,6 @@ publication_translate_columns(Relation targetrel, List *columns,
 					errmsg("cannot use system column \"%s\" in publication column list",
 						   colname));
 
-		if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
-			ereport(ERROR,
-					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-					errmsg("cannot use generated column \"%s\" in publication column list",
-						   colname));
-
 		if (bms_is_member(attnum, set))
 			ereport(ERROR,
 					errcode(ERRCODE_DUPLICATE_OBJECT),
@@ -1232,7 +1225,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			{
 				Form_pg_attribute att = TupleDescAttr(desc, i);
 
-				if (att->attisdropped || att->attgenerated)
+				if (att->attisdropped)
 					continue;
 
 				attnums[nattnums++] = att->attnum;
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9efc9159f2..a090b36465 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->includegeneratedcolumn = subform->subincludegeneratedcolumn;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e407428dbc..8d245722bf 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_INCLUDE_GENERATED_COLUMN		0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -99,6 +100,7 @@ typedef struct SubOpts
 	bool		failover;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	bool		include_generated_column;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -161,6 +163,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->failover = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_INCLUDE_GENERATED_COLUMN))
+		opts->include_generated_column = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -366,6 +370,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_INCLUDE_GENERATED_COLUMN) &&
+				 strcmp(defel->defname, "include_generated_column") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_INCLUDE_GENERATED_COLUMN))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_INCLUDE_GENERATED_COLUMN;
+			opts->include_generated_column = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -446,6 +459,20 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	/*
+	 * Do additional checking for disallowed combination when copy_data and
+	 * include_generated_column are true. COPY of generated columns is not supported
+	 * yet.
+	 */
+	if (opts->copy_data && opts->include_generated_column)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+		/*- translator: both %s are strings of the form "option = value" */
+					errmsg("%s and %s are mutually exclusive options",
+						"copy_data = true", "include_generated_column = true")));
+	}
 }
 
 /*
@@ -603,7 +630,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN |
+					  SUBOPT_INCLUDE_GENERATED_COLUMN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -723,6 +751,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_subincludegeneratedcolumn - 1] = BoolGetDatum(opts.include_generated_column);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 3c2b1bb496..48830b0e10 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -598,6 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.include_generated_column &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfoString(&cmd, ", include_generated_columns 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..7405eb3deb 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -30,10 +30,12 @@
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel,
-								   Bitmapset *columns);
+								   Bitmapset *columns,
+								   bool include_generated_columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
 								   TupleTableSlot *slot,
-								   bool binary, Bitmapset *columns);
+								   bool binary, Bitmapset *columns,
+								   bool include_generated_columns);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+						TupleTableSlot *newslot, bool binary, Bitmapset *columns,
+						bool include_generated_columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+						   include_generated_columns);
 }
 
 /*
@@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns)
+						bool binary, Bitmapset *columns,
+						bool include_generated_columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -478,11 +483,13 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+		logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+							   include_generated_columns);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+						   include_generated_columns);
 }
 
 /*
@@ -532,7 +539,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 void
 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, bool binary,
-						Bitmapset *columns)
+						Bitmapset *columns, bool include_generated_columns)
 {
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -552,7 +559,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+	logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+						   include_generated_columns);
 }
 
 /*
@@ -668,7 +676,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  */
 void
 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
-					 Bitmapset *columns)
+					 Bitmapset *columns, bool include_generated_columns)
 {
 	char	   *relname;
 
@@ -690,7 +698,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel, columns);
+	logicalrep_write_attrs(out, rel, columns, include_generated_columns);
 }
 
 /*
@@ -767,7 +775,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  */
 static void
 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
-					   bool binary, Bitmapset *columns)
+					   bool binary, Bitmapset *columns,
+					   bool include_generated_columns)
 {
 	TupleDesc	desc;
 	Datum	   *values;
@@ -781,7 +790,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !include_generated_columns)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
@@ -802,7 +814,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		Form_pg_type typclass;
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !include_generated_columns)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
@@ -923,7 +938,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
+					   bool include_generated_columns)
 {
 	TupleDesc	desc;
 	int			i;
@@ -938,7 +954,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !include_generated_columns)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
@@ -959,7 +978,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 		uint8		flags = 0;
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !include_generated_columns)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f139e7b01e..5de1531567 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -421,7 +421,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 			int			attnum;
 			Form_pg_attribute attr = TupleDescAttr(desc, i);
 
-			if (attr->attisdropped || attr->attgenerated)
+			if (attr->attisdropped)
 			{
 				entry->attrmap->attnums[i] = -1;
 				continue;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..a662a1f8ff 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4379,6 +4379,7 @@ set_stream_options(WalRcvStreamOptions *options,
 
 	options->proto.logical.twophase = false;
 	options->proto.logical.origin = pstrdup(MySubscription->origin);
+	options->proto.logical.include_generated_column = MySubscription->includegeneratedcolumn;
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d2b35cfb96..7f8715fb29 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -86,7 +86,8 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
-									Bitmapset *columns);
+									Bitmapset *columns,
+									bool include_generated_columns);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -283,11 +284,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		include_generated_columns_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->include_generated_columns = false;
 
 	foreach(lc, options)
 	{
@@ -396,6 +399,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "include_generated_columns") == 0)
+		{
+			if (include_generated_columns_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			include_generated_columns_option_given = true;
+
+			data->include_generated_columns = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -731,11 +744,13 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	{
 		Relation	ancestor = RelationIdGetRelation(relentry->publish_as_relid);
 
-		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns,
+								data->include_generated_columns);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
+	send_relation_and_attrs(relation, xid, ctx, relentry->columns,
+							data->include_generated_columns);
 
 	if (data->in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -749,7 +764,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
 						LogicalDecodingContext *ctx,
-						Bitmapset *columns)
+						Bitmapset *columns, bool include_generated_columns)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -766,7 +781,10 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !include_generated_columns)
 			continue;
 
 		if (att->atttypid < FirstGenbkiObjectId)
@@ -782,7 +800,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation, columns);
+	logicalrep_write_rel(ctx->out, xid, relation, columns, include_generated_columns);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -1085,7 +1103,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 					{
 						Form_pg_attribute att = TupleDescAttr(desc, i);
 
-						if (att->attisdropped || att->attgenerated)
+						if (att->attisdropped)
 							continue;
 
 						nliveatts++;
@@ -1413,7 +1431,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
-
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1531,15 +1548,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+									data->binary, relentry->columns,
+									data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
-									new_slot, data->binary, relentry->columns);
+									new_slot, data->binary, relentry->columns,
+									data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
-									data->binary, relentry->columns);
+									data->binary, relentry->columns,
+									data->include_generated_columns);
 			break;
 		default:
 			Assert(false);
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d453e224d9..e8ff752fd9 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3365,7 +3365,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
 					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "streaming", "synchronous_commit", "two_phase","include_generated_columns");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..4d1d45e811 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subincludegeneratedcolumn;	/* True if generated columns must be published */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -157,6 +159,7 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	bool		includegeneratedcolumn;	/* publish generated column data */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..34ec40b07e 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -225,18 +225,22 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *columns,
+									bool include_generated_columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+									TupleTableSlot *newslot, bool binary,
+									Bitmapset *columns,
+									bool include_generated_columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
 									Relation rel, TupleTableSlot *oldslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *columns,
+									bool include_generated_columns);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
 											  LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
@@ -247,7 +251,8 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel, Bitmapset *columns);
+								 Relation rel, Bitmapset *columns,
+								 bool include_generated_columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..224394cb93 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		include_generated_columns;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 12f71fa99b..fb37720920 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		include_generated_column; /* publish generated columns */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 30b6371134..aa1450315d 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -687,9 +687,9 @@ UPDATE testpub_tbl5 SET a = 1;
 ERROR:  cannot update table "testpub_tbl5"
 DETAIL:  Column list used by the publication does not cover the replica identity.
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
+-- ok: generated columns can be in the list too
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-ERROR:  cannot use generated column "d" in publication column list
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 ERROR:  cannot use system column "ctid" in publication column list
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 479d4f3264..b1899ddb1a 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -413,8 +413,9 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
 UPDATE testpub_tbl5 SET a = 1;
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
+-- ok: generated columns can be in the list too
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 -- ok
diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl
index 8b2e5f4708..e7a48a02d3 100644
--- a/src/test/subscription/t/011_generated.pl
+++ b/src/test/subscription/t/011_generated.pl
@@ -24,21 +24,44 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)"
 );
 
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)"
+);
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED, c int)"
 );
 
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b int)"
+);
+
 # data for initial sync
 
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab1 (a) VALUES (1), (2), (3)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 (a) VALUES (1), (2), (3)");
 
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub1 FOR ALL TABLES");
+	"CREATE PUBLICATION pub1 FOR TABLE tab1");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub2 FOR TABLE tab2");
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
 );
 
+my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres', qq(
+	CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (include_generated_column = true)
+));
+ok( $stderr =~
+	  qr/copy_data = true and include_generated_column = true are mutually exclusive options/,
+	'cannot use both include_generated_column and copy_data as true');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (include_generated_column = true, copy_data = false)"
+);
+
 # Wait for initial sync of all subscriptions
 $node_subscriber->wait_for_subscription_sync;
 
@@ -62,6 +85,14 @@ is( $result, qq(1|22|
 4|88|
 6|132|), 'generated columns replicated');
 
+$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (4), (5)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2");
+is( $result, qq(4|8
+5|10), 'generated columns replicated to non-generated column on subscriber');
+
 # try it with a subscriber-side trigger
 
 $node_subscriber->safe_psql(
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9a97fa5020..6e73f892e9 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -1202,7 +1202,7 @@ $result = $node_publisher->safe_psql(
 is( $result, qq(t
 t), 'check the number of columns in the old tuple');
 
-# TEST: Generated and dropped columns are not considered for the column list.
+# TEST: Dropped columns are not considered for the column list.
 # So, the publication having a column list except for those columns and a
 # publication without any column (aka all columns as part of the columns
 # list) are considered to have the same column list.
@@ -1211,7 +1211,7 @@ $node_publisher->safe_psql(
 	CREATE TABLE test_mix_4 (a int PRIMARY KEY, b int, c int, d int GENERATED ALWAYS AS (a + 1) STORED);
 	ALTER TABLE test_mix_4 DROP COLUMN c;
 
-	CREATE PUBLICATION pub_mix_7 FOR TABLE test_mix_4 (a, b);
+	CREATE PUBLICATION pub_mix_7 FOR TABLE test_mix_4 (a, b, d);
 	CREATE PUBLICATION pub_mix_8 FOR TABLE test_mix_4;
 
 	-- initial data
-- 
2.41.0.windows.3

