From 19d34a71eafaa10f29ae3e797156f92d147a6527 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Mon, 3 Jun 2024 17:09:05 +0530
Subject: [PATCH v5 3/3] Support copy of generated columns during tablesync

Support copy of generated columns during tablesync if
'generated_column' is set as 'true' while creating subscription
---
 doc/src/sgml/ref/create_subscription.sgml   |  9 ++-
 src/backend/commands/subscriptioncmds.c     | 70 ++++++++++++++-------
 src/backend/replication/logical/tablesync.c |  5 +-
 src/test/subscription/t/011_generated.pl    | 30 ++++++---
 4 files changed, 76 insertions(+), 38 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 57520b5aef..04d133a42e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -439,11 +439,10 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
 
          <para>
-          This parameter can only be set true if <literal>copy_data</literal> is
-          set to <literal>false</literal>. If the subscriber-side column is also a
-          generated column then this option has no effect; the replicated data will
-          be ignored and the subscriber column will be filled as normal with the
-          subscriber-side computed or default data.
+          If the subscriber-side column is also a generated column then this option
+          has no effect; the replicated data will be ignored and the subscriber 
+          column will be filled as normal with the subscriber-side computed or 
+          default data.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8d245722bf..3e78a758c1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -103,7 +103,7 @@ typedef struct SubOpts
 	bool		include_generated_column;
 } SubOpts;
 
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_table_list(WalReceiverConn *wrconn, List *publications, bool include_generated_column);
 static void check_publications_origin(WalReceiverConn *wrconn,
 									  List *publications, bool copydata,
 									  char *origin, Oid *subrel_local_oids,
@@ -459,20 +459,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
-
-	/*
-	 * Do additional checking for disallowed combination when copy_data and
-	 * include_generated_column are true. COPY of generated columns is not supported
-	 * yet.
-	 */
-	if (opts->copy_data && opts->include_generated_column)
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-		/*- translator: both %s are strings of the form "option = value" */
-					errmsg("%s and %s are mutually exclusive options",
-						"copy_data = true", "include_generated_column = true")));
-	}
 }
 
 /*
@@ -802,7 +788,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * Get the table list from publisher and build local table status
 			 * info.
 			 */
-			tables = fetch_table_list(wrconn, publications);
+			tables = fetch_table_list(wrconn, publications, opts.include_generated_column);
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
@@ -925,7 +911,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			check_publications(wrconn, validate_publications);
 
 		/* Get the table list from publisher. */
-		pubrel_names = fetch_table_list(wrconn, sub->publications);
+		pubrel_names = fetch_table_list(wrconn, sub->publications, sub->includegeneratedcolumn);
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid, false);
@@ -2161,15 +2147,17 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
  * list and row filter are specified for different publications.
  */
 static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_table_list(WalReceiverConn *wrconn, List *publications, bool include_generated_column)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
+	Oid			tableRow[4] = {TEXTOID, TEXTOID, InvalidOid, InvalidOid};
 	List	   *tablelist = NIL;
 	int			server_version = walrcv_server_version(wrconn);
 	bool		check_columnlist = (server_version >= 150000);
+	bool		check_gen_col = (server_version >= 170000);
+	int 		column_count;
 
 	initStringInfo(&cmd);
 
@@ -2195,8 +2183,19 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		 * to worry if different publications have specified them in a
 		 * different order. See publication_translate_columns.
 		 */
-		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
-						 "       FROM pg_class c\n"
+		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n");
+
+		/*
+		 * Get the count of generated columns in the table in the the publication.
+		 */
+		if(!include_generated_column && check_gen_col)
+		{
+			tableRow[3] = INT8OID;
+			appendStringInfo(&cmd, ", (SELECT COUNT(*) FROM pg_attribute a where a.attrelid = c.oid\n"
+								   " and a.attnum = ANY(gpt.attrs) and a.attgenerated = 's') gen_col_count\n");
+		}
+
+		appendStringInfo(&cmd, "  FROM pg_class c\n"
 						 "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
 						 "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
 						 "                FROM pg_publication\n"
@@ -2221,7 +2220,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		appendStringInfoChar(&cmd, ')');
 	}
 
-	res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
+	column_count = (!include_generated_column && check_gen_col) ? 4 : (check_columnlist ? 3 : 2);
+	res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -2236,6 +2236,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	{
 		char	   *nspname;
 		char	   *relname;
+		ArrayType  *attlist;
+		int			gen_col_count;
+		int			attcount;
+		Datum		attlistdatum;
 		bool		isnull;
 		RangeVar   *rv;
 
@@ -2244,6 +2248,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 
+		/* attlistdatum can be NULL in case of publication is created on table with no columns */
+		attlistdatum = slot_getattr(slot, 3, &isnull);
+
+		/*
+		 * If include_generated_column option is false and all the column of the table in the
+		 * publication are generated then we should throw an error.
+		 */
+		if (!isnull && !include_generated_column && check_gen_col)
+		{
+			attlist = DatumGetArrayTypeP(attlistdatum);
+			gen_col_count = DatumGetInt32(slot_getattr(slot, 4, &isnull));
+			Assert(!isnull);
+
+			attcount = ArrayGetNItems(ARR_NDIM(attlist), ARR_DIMS(attlist));
+
+			if (attcount != 0 && attcount == gen_col_count)
+				ereport(ERROR,
+						errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("cannot use only generated column for table \"%s.%s\" in publication when generated_column option is false",
+						   nspname, relname));
+		}
+
 		rv = makeRangeVar(nspname, relname, -1);
 
 		if (check_columnlist && list_member(tablelist, rv))
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b00267f042..71716cb937 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -957,8 +957,9 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "   AND a.attrelid = %u"
 					 " ORDER BY a.attnum",
 					 lrel->remoteid,
-					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
-					  "AND a.attgenerated = ''" : ""),
+					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 && 
+					 (walrcv_server_version(LogRepWorkerWalRcvConn) <= 160000 ||
+					 !MySubscription->includegeneratedcolumn) ? "AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
 					  lengthof(attrRow), attrRow);
diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl
index e7a48a02d3..e597927a61 100644
--- a/src/test/subscription/t/011_generated.pl
+++ b/src/test/subscription/t/011_generated.pl
@@ -28,6 +28,10 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab2 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)"
 );
 
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)"
+);
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED, c int)"
 );
@@ -36,31 +40,34 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE TABLE tab2 (a int PRIMARY KEY, b int)"
 );
 
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (a int PRIMARY KEY, b int)"
+);
+
 # data for initial sync
 
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab1 (a) VALUES (1), (2), (3)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab2 (a) VALUES (1), (2), (3)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 (a) VALUES (1), (2), (3)");
 
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION pub1 FOR TABLE tab1");
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION pub2 FOR TABLE tab2");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub3 FOR TABLE tab3");
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
 );
-
-my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres', qq(
-	CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (include_generated_column = true)
-));
-ok( $stderr =~
-	  qr/copy_data = true and include_generated_column = true are mutually exclusive options/,
-	'cannot use both include_generated_column and copy_data as true');
-
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (include_generated_column = true, copy_data = false)"
 );
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (include_generated_column = true)"
+);
 
 # Wait for initial sync of all subscriptions
 $node_subscriber->wait_for_subscription_sync;
@@ -70,6 +77,11 @@ is( $result, qq(1|22
 2|44
 3|66), 'generated columns initial sync');
 
+$result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab3");
+is( $result, qq(1|2
+2|4
+3|6), 'generated columns initial sync with include_generated_column = true');
+
 # data to replicate
 
 $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (4), (5)");
@@ -87,7 +99,7 @@ is( $result, qq(1|22|
 
 $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (4), (5)");
 
-$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2");
 is( $result, qq(4|8
-- 
2.41.0.windows.3

