From 0b50b29852030fe62a17c8526c2eea84bf23fa27 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Mon, 28 Oct 2024 17:03:00 +0800
Subject: [PATCH v44 2/2] Support copy generated column during table sync

---
 src/backend/replication/logical/tablesync.c | 50 +++++++++++++--------
 src/test/subscription/t/031_column_list.pl  |  9 +++-
 2 files changed, 39 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d4b5d210e3..6bb44c0d71 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -791,19 +791,20 @@ copy_read_data(void *outbuf, int minread, int maxread)
  * qualifications to be used in the COPY command.
  */
 static void
-fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel, List **qual)
+fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
+						List **qual, bool *remotegencolpresent)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
-	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
 	Oid			qualRow[] = {TEXTOID};
 	bool		isnull;
 	int			natt;
 	StringInfo	pub_names = NULL;
 	Bitmapset  *included_cols = NULL;
+	int			server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -851,7 +852,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	 * We need to do this before fetching info about column names and types,
 	 * so that we can skip columns that should not be replicated.
 	 */
-	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	if (server_version >= 150000)
 	{
 		WalRcvExecResult *pubres;
 		TupleTableSlot *tslot;
@@ -867,9 +868,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		 */
 		resetStringInfo(&cmd);
 		appendStringInfo(&cmd,
-						 "SELECT DISTINCT"
-						 "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
-						 "   THEN NULL ELSE gpt.attrs END)"
+						 "SELECT DISTINCT gpt.attrs"
 						 "  FROM pg_publication p,"
 						 "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
 						 "  pg_class c"
@@ -941,20 +940,21 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 "SELECT a.attnum,"
 					 "       a.attname,"
 					 "       a.atttypid,"
-					 "       a.attnum = ANY(i.indkey)"
+					 "       a.attnum = ANY(i.indkey)");
+
+	if (server_version >= 180000)
+		appendStringInfo(&cmd, ", a.attgenerated != ''");
+
+	appendStringInfo(&cmd,
 					 "  FROM pg_catalog.pg_attribute a"
 					 "  LEFT JOIN pg_catalog.pg_index i"
 					 "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
 					 " WHERE a.attnum > 0::pg_catalog.int2"
-					 "   AND NOT a.attisdropped %s"
+					 "   AND NOT a.attisdropped"
 					 "   AND a.attrelid = %u"
-					 " ORDER BY a.attnum",
-					 lrel->remoteid,
-					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
-					  "AND a.attgenerated = ''" : ""),
-					 lrel->remoteid);
+					 " ORDER BY a.attnum", lrel->remoteid, lrel->remoteid);
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
-					  lengthof(attrRow), attrRow);
+					  server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -998,6 +998,9 @@ fetch_remote_table_info(char *nspname, char *relname,
 		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
+		if (server_version >= 180000)
+			*remotegencolpresent |= DatumGetBool(slot_getattr(slot, 5, &isnull));
+
 		/* Should never happen. */
 		if (++natt >= MaxTupleAttributeNumber)
 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1030,7 +1033,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
 	 * that includes this relation
 	 */
-	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	if (server_version >= 150000)
 	{
 		/* Reuse the already-built pub_names. */
 		Assert(pub_names != NULL);
@@ -1106,10 +1109,12 @@ copy_table(Relation rel)
 	List	   *attnamelist;
 	ParseState *pstate;
 	List	   *options = NIL;
+	bool		gencol_copy_needed = false;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel, &qual);
+							RelationGetRelationName(rel), &lrel, &qual,
+							&gencol_copy_needed);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -1121,8 +1126,11 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 
-	/* Regular table with no row filter */
-	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+	/*
+	 * Regular table with no row filter and copy of generated columns is not
+	 * necessary.
+	 */
+	if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_copy_needed)
 	{
 		appendStringInfo(&cmd, "COPY %s",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1156,6 +1164,10 @@ copy_table(Relation rel)
 		 * (SELECT ...), but we can't just do SELECT * because we need to not
 		 * copy generated columns. For tables with any row filters, build a
 		 * SELECT query with OR'ed row filters for COPY.
+		 *
+		 * We also need to use this same COPY (SELECT ...) syntax when
+		 * generated columns are published, because copy of generated columns
+		 * is not supported by the normal COPY.
 		 */
 		appendStringInfoString(&cmd, "COPY (SELECT ");
 		for (int i = 0; i < lrel.natts; i++)
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index d4408c1cd7..2d605a4095 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -1280,6 +1280,7 @@ ok( $stderr =~
 $node_publisher->safe_psql(
 	'postgres', qq(
 	CREATE TABLE test_gen (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 1) STORED);
+	INSERT INTO test_gen VALUES (0);
 	CREATE PUBLICATION pub_gen FOR TABLE test_gen (a, b);
 ));
 
@@ -1291,6 +1292,11 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->wait_for_subscription_sync;
 
+is( $node_subscriber->safe_psql(
+		'postgres', "SELECT * FROM test_gen ORDER BY a"),
+	qq(0|1),
+	'replication with generated columns in column list');
+
 $node_publisher->safe_psql(
 	'postgres', qq(
 	INSERT INTO test_gen VALUES (1);
@@ -1300,7 +1306,8 @@ $node_publisher->wait_for_catchup('sub_gen');
 
 is( $node_subscriber->safe_psql(
 		'postgres', "SELECT * FROM test_gen ORDER BY a"),
-	qq(1|2),
+	qq(0|1
+1|2),
 	'replication with generated columns in column list');
 
 # TEST: If the column list is changed after creating the subscription, we
-- 
2.34.1

