From d52ebbf85ac34cf47285206d7bbe54fe9e01a2f5 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Mon, 20 May 2024 10:58:31 +0530
Subject: [PATCH v2] Support generated column capturing generated column data
 using pgoutput and test_decoding plugin

Now if include_generated_columns option is specified, the generated
column information and generated column data also will be sent.
Usage from pgoutput plugin:
SELECT * FROM pg_logical_slot_peek_binary_changes('slot1', NULL, NULL,
'proto_version', '1', 'publication_names', 'pub1',
'include_generated_columns', 'true');

Usage from test_decoding plugin:
SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '1',
'include_generated_columns', '1');

New option generated_option is added in create subscription. Now if this
option is specified as 'true' during create subscription, generated
columns in the tables, present in publisher (to which this subscription is
subscribed) can also be replicated.
---
 contrib/test_decoding/expected/ddl.out        | 14 +++++
 contrib/test_decoding/sql/ddl.sql             |  6 ++
 contrib/test_decoding/test_decoding.c         | 25 +++++++--
 doc/src/sgml/ref/create_subscription.sgml     | 19 +++++++
 src/backend/catalog/pg_publication.c          |  8 +--
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/commands/subscriptioncmds.c       | 39 ++++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 ++
 src/backend/replication/logical/proto.c       | 56 +++++++++++++------
 src/backend/replication/logical/relation.c    |  2 +-
 src/backend/replication/logical/worker.c      |  1 +
 src/backend/replication/pgoutput/pgoutput.c   | 41 ++++++++++----
 src/include/catalog/pg_subscription.h         |  3 +
 src/include/replication/logicalproto.h        | 13 +++--
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/regress/expected/publication.out     |  4 +-
 src/test/regress/sql/publication.sql          |  3 +-
 src/test/subscription/t/031_column_list.pl    |  4 +-
 19 files changed, 194 insertions(+), 51 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 5713b8ab1c..d10bb2d2fc 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -831,6 +831,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 data
 (0 rows)
 \pset format aligned
+-- check include_generated_columns option with generated column
+CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED);
+INSERT INTO gencoltable (a) VALUES (1), (2), (3);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1');
+                            data                             
+-------------------------------------------------------------
+ BEGIN
+ table public.gencoltable: INSERT: a[integer]:1 b[integer]:2
+ table public.gencoltable: INSERT: a[integer]:2 b[integer]:4
+ table public.gencoltable: INSERT: a[integer]:3 b[integer]:6
+ COMMIT
+(5 rows)
+
+DROP TABLE gencoltable;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index 2f8e4e7f2c..d688775580 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -437,6 +437,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 \pset format aligned
 
+-- check include_generated_columns option with generated column
+CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED);
+INSERT INTO gencoltable (a) VALUES (1), (2), (3);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1');
+DROP TABLE gencoltable;
+
 SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 7c50d13969..f15ff93ac3 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -31,6 +31,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		only_local;
+	bool		include_generated_columns;
 } TestDecodingData;
 
 /*
@@ -259,6 +260,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include_generated_columns") == 0)
+		{
+			if (elem->arg == NULL)
+				continue;
+			else if (!parse_bool(strVal(elem->arg), &data->include_generated_columns))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -521,7 +532,8 @@ print_literal(StringInfo s, Oid typid, char *outputstr)
 
 /* print the tuple 'tuple' into the StringInfo s */
 static void
-tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
+tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple,
+					bool skip_nulls, bool include_generated_columns)
 {
 	int			natt;
 
@@ -544,6 +556,9 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
 		if (attr->attisdropped)
 			continue;
 
+		if (attr->attgenerated && !include_generated_columns)
+			continue;
+
 		/*
 		 * Don't print system columns, oid will already have been printed if
 		 * present.
@@ -641,7 +656,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.newtuple,
-									false);
+									false, data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			appendStringInfoString(ctx->out, " UPDATE:");
@@ -650,7 +665,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				appendStringInfoString(ctx->out, " old-key:");
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.oldtuple,
-									true);
+									true, data->include_generated_columns );
 				appendStringInfoString(ctx->out, " new-tuple:");
 			}
 
@@ -659,7 +674,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.newtuple,
-									false);
+									false, data->include_generated_columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			appendStringInfoString(ctx->out, " DELETE:");
@@ -671,7 +686,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				tuple_to_stringinfo(ctx->out, tupdesc,
 									change->data.tp.oldtuple,
-									true);
+									true, data->include_generated_columns);
 			break;
 		default:
 			Assert(false);
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 740b7d9421..737939f377 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -428,6 +428,25 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry id="sql-createsubscription-params-with-generated-column">
+        <term><literal>generated-column</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the generated columns present in the tables
+          associated with the subscription should be replicated. The default is
+          <literal>false</literal>.
+         </para>
+
+         <para>
+          This parameter can only be set true if copy_data is set to false.
+          This option works fine when a generated column (in publisher) is replicated to a
+          non-generated column (in subscriber). Else if it is replicated to a generated
+          column, it will ignore the replicated data and fill the column with computed or
+          default data.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 0602398a54..2acb574ac8 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -534,12 +534,6 @@ publication_translate_columns(Relation targetrel, List *columns,
 					errmsg("cannot use system column \"%s\" in publication column list",
 						   colname));
 
-		if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
-			ereport(ERROR,
-					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
-					errmsg("cannot use generated column \"%s\" in publication column list",
-						   colname));
-
 		if (bms_is_member(attnum, set))
 			ereport(ERROR,
 					errcode(ERRCODE_DUPLICATE_OBJECT),
@@ -1232,7 +1226,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			{
 				Form_pg_attribute att = TupleDescAttr(desc, i);
 
-				if (att->attisdropped || att->attgenerated)
+				if (att->attisdropped)
 					continue;
 
 				attnums[nattnums++] = att->attnum;
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9efc9159f2..260fba228a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->generatedcolumn = subform->subgeneratedcolumn;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e407428dbc..ccd0998d17 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_GENERATED_COLUMN		0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -99,6 +100,7 @@ typedef struct SubOpts
 	bool		failover;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	bool		generated_column;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -161,6 +163,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->failover = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_GENERATED_COLUMN))
+		opts->generated_column = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -366,6 +370,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_GENERATED_COLUMN) &&
+				 strcmp(defel->defname, "generated_column") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_GENERATED_COLUMN))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_GENERATED_COLUMN;
+			opts->generated_column = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -446,6 +459,19 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	/*
+	 * Do additional checking for disallowed combination when copy_data and
+	 * generated_column are true. COPY of generated columns is not supported yet.
+	 */
+	if (opts->copy_data && opts->generated_column)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+		/*- translator: both %s are strings of the form "option = value" */
+					errmsg("%s and %s are mutually exclusive options",
+						"copy_data = true", "generated_column = true")));
+	}
 }
 
 /*
@@ -603,7 +629,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN |
+					  SUBOPT_GENERATED_COLUMN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -723,6 +750,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_subgeneratedcolumn - 1] = BoolGetDatum(opts.generated_column);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -1146,7 +1174,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_GENERATED_COLUMN);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1263,6 +1291,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_GENERATED_COLUMN))
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("toggling generated_column option is not allowed.")));
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 3c2b1bb496..fa00cdc901 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -598,6 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.generated_column &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfoString(&cmd, ", include_generated_columns 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..38da3a5ca2 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -30,10 +30,12 @@
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel,
-								   Bitmapset *columns);
+								   Bitmapset *columns,
+								   bool publish_generated_column);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
 								   TupleTableSlot *slot,
-								   bool binary, Bitmapset *columns);
+								   bool binary, Bitmapset *columns,
+								   bool publish_generated_column);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+						TupleTableSlot *newslot, bool binary, Bitmapset *columns,
+						bool publish_generated_column)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+						   publish_generated_column);
 }
 
 /*
@@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns)
+						bool binary, Bitmapset *columns,
+						bool publish_generated_column)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -478,11 +483,13 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+		logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+							   publish_generated_column);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+						   publish_generated_column);
 }
 
 /*
@@ -532,7 +539,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 void
 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, bool binary,
-						Bitmapset *columns)
+						Bitmapset *columns, bool publish_generated_column)
 {
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -552,7 +559,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, columns);
+	logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+						   publish_generated_column);
 }
 
 /*
@@ -668,7 +676,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  */
 void
 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
-					 Bitmapset *columns)
+					 Bitmapset *columns, bool publish_generated_column)
 {
 	char	   *relname;
 
@@ -690,7 +698,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel, columns);
+	logicalrep_write_attrs(out, rel, columns, publish_generated_column);
 }
 
 /*
@@ -767,7 +775,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  */
 static void
 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
-					   bool binary, Bitmapset *columns)
+					   bool binary, Bitmapset *columns,
+					   bool publish_generated_column)
 {
 	TupleDesc	desc;
 	Datum	   *values;
@@ -781,12 +790,15 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
 			continue;
 
+		if (att->attgenerated && !publish_generated_column)
+			continue;
+
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
@@ -802,12 +814,15 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		Form_pg_type typclass;
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
 			continue;
 
+		if (att->attgenerated && !publish_generated_column)
+			continue;
+
 		if (isnull[i])
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@@ -923,7 +938,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
+					   bool publish_generated_column)
 {
 	TupleDesc	desc;
 	int			i;
@@ -938,12 +954,15 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
 			continue;
 
+		if (att->attgenerated && !publish_generated_column)
+			continue;
+
 		nliveatts++;
 	}
 	pq_sendint16(out, nliveatts);
@@ -959,12 +978,15 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 		uint8		flags = 0;
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
 			continue;
 
 		if (!column_in_column_list(att->attnum, columns))
 			continue;
 
+		if (att->attgenerated && !publish_generated_column)
+			continue;
+
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f139e7b01e..5de1531567 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -421,7 +421,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 			int			attnum;
 			Form_pg_attribute attr = TupleDescAttr(desc, i);
 
-			if (attr->attisdropped || attr->attgenerated)
+			if (attr->attisdropped)
 			{
 				entry->attrmap->attnums[i] = -1;
 				continue;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..83a689d1f5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4379,6 +4379,7 @@ set_stream_options(WalRcvStreamOptions *options,
 
 	options->proto.logical.twophase = false;
 	options->proto.logical.origin = pstrdup(MySubscription->origin);
+	options->proto.logical.generated_column = MySubscription->generatedcolumn;
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d2b35cfb96..f86a3bf152 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -86,7 +86,8 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
-									Bitmapset *columns);
+									Bitmapset *columns,
+									bool publish_generated_column);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -283,11 +284,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		generate_column_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->publish_generated_column = false;
 
 	foreach(lc, options)
 	{
@@ -396,6 +399,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "include_generated_columns") == 0)
+		{
+			if (generate_column_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			generate_column_option_given = true;
+
+			data->publish_generated_column = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -731,11 +744,13 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	{
 		Relation	ancestor = RelationIdGetRelation(relentry->publish_as_relid);
 
-		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns,
+								data->publish_generated_column);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
+	send_relation_and_attrs(relation, xid, ctx, relentry->columns,
+							data->publish_generated_column);
 
 	if (data->in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -749,7 +764,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
 						LogicalDecodingContext *ctx,
-						Bitmapset *columns)
+						Bitmapset *columns, bool publish_generated_column)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -766,7 +781,10 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-		if (att->attisdropped || att->attgenerated)
+		if (att->attisdropped)
+			continue;
+
+		if (att->attgenerated && !publish_generated_column)
 			continue;
 
 		if (att->atttypid < FirstGenbkiObjectId)
@@ -782,7 +800,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation, columns);
+	logicalrep_write_rel(ctx->out, xid, relation, columns, publish_generated_column);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -1085,7 +1103,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 					{
 						Form_pg_attribute att = TupleDescAttr(desc, i);
 
-						if (att->attisdropped || att->attgenerated)
+						if (att->attisdropped)
 							continue;
 
 						nliveatts++;
@@ -1531,15 +1549,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+									data->binary, relentry->columns,
+									data->publish_generated_column);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
-									new_slot, data->binary, relentry->columns);
+									new_slot, data->binary, relentry->columns,
+									data->publish_generated_column);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
-									data->binary, relentry->columns);
+									data->binary, relentry->columns,
+									data->publish_generated_column);
 			break;
 		default:
 			Assert(false);
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..a5cbf68af5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subgeneratedcolumn;	/* True if generated colums must be published */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -157,6 +159,7 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	bool		generatedcolumn;	/* publish generated column data */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..2676acefce 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -225,18 +225,22 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *columns,
+									bool publish_generated_column);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+									TupleTableSlot *newslot, bool binary,
+									Bitmapset *columns,
+									bool publish_generated_column);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
 									Relation rel, TupleTableSlot *oldslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *columns,
+									bool publish_generated_column);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
 											  LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
@@ -247,7 +251,8 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel, Bitmapset *columns);
+								 Relation rel, Bitmapset *columns,
+								 bool publish_generated_column);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..c4773f60a3 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		publish_generated_column;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 12f71fa99b..f3b8f22a7d 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		generated_column; /* publish generated columns */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 30b6371134..d33d05c6e9 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -687,9 +687,9 @@ UPDATE testpub_tbl5 SET a = 1;
 ERROR:  cannot update table "testpub_tbl5"
 DETAIL:  Column list used by the publication does not cover the replica identity.
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
+-- ok
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-ERROR:  cannot use generated column "d" in publication column list
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 ERROR:  cannot use system column "ctid" in publication column list
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 479d4f3264..26d91be9d0 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -413,8 +413,9 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
 UPDATE testpub_tbl5 SET a = 1;
 ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
+-- ok
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 -- ok
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9a97fa5020..6e73f892e9 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -1202,7 +1202,7 @@ $result = $node_publisher->safe_psql(
 is( $result, qq(t
 t), 'check the number of columns in the old tuple');
 
-# TEST: Generated and dropped columns are not considered for the column list.
+# TEST: Dropped columns are not considered for the column list.
 # So, the publication having a column list except for those columns and a
 # publication without any column (aka all columns as part of the columns
 # list) are considered to have the same column list.
@@ -1211,7 +1211,7 @@ $node_publisher->safe_psql(
 	CREATE TABLE test_mix_4 (a int PRIMARY KEY, b int, c int, d int GENERATED ALWAYS AS (a + 1) STORED);
 	ALTER TABLE test_mix_4 DROP COLUMN c;
 
-	CREATE PUBLICATION pub_mix_7 FOR TABLE test_mix_4 (a, b);
+	CREATE PUBLICATION pub_mix_7 FOR TABLE test_mix_4 (a, b, d);
 	CREATE PUBLICATION pub_mix_8 FOR TABLE test_mix_4;
 
 	-- initial data
-- 
2.41.0.windows.3

