From 373a1cfb2dea0b23ca7894a5467201ce75db9ae9 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Fri, 11 Mar 2022 10:08:40 +0800
Subject: [PATCH] Try to get column filters with one SQL.

---
 src/backend/replication/logical/tablesync.c | 87 ++++++---------------
 1 file changed, 24 insertions(+), 63 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6eb9fc902b..82c9003b54 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -782,23 +782,16 @@ fetch_remote_table_info(char *nspname, char *relname,
 			first = false;
 		}
 
-		/*
-		 * Check for column filters - we first check if there's any publication
-		 * that has no column list for the given relation, which means we shall
-		 * replicate all columns.
-		 *
-		 * It's easier than having to do this separately, and only then do the
-		 * second query 
-		 */
 		resetStringInfo(&cmd);
 		appendStringInfo(&cmd,
-						 "SELECT 1"
+						 "SELECT DISTINCT unnest"
 						 "  FROM pg_publication p"
 						 "  LEFT OUTER JOIN pg_publication_rel pr"
-						 "       ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
+						 "       ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
+						 "  LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
 						 "  LATERAL pg_get_publication_tables(p.pubname) gpt"
 						 " WHERE gpt.relid = %u"
-						 "   AND p.pubname IN ( %s ) AND pr.prattrs IS NULL LIMIT 1",
+						 "   AND p.pubname IN ( %s )",
 						 lrel->remoteid,
 						 lrel->remoteid,
 						 pub_names.data);
@@ -812,65 +805,33 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
 							nspname, relname, pubres->err)));
 
+		/*
+		 * Multiple column list expressions for the same table will be combined
+		 * by merging them. If any of the lists for this table are null, it
+		 * means the whole table will be copied. In this case it is not necessary
+		 * to construct a unified column list expression at all.
+		 */
 		slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
-		if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
-			all_columns = true;
-
-		ExecDropSingleTupleTableSlot(slot);
-		walrcv_clear_result(pubres);
-
-		if (!all_columns)
+		while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
 		{
-			resetStringInfo(&cmd);
-			appendStringInfo(&cmd,
-							 "SELECT unnest(pr.prattrs)"
-							 "  FROM pg_publication p"
-							 "  LEFT OUTER JOIN pg_publication_rel pr"
-							 "       ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
-							 "  LATERAL pg_get_publication_tables(p.pubname) gpt"
-							 " WHERE gpt.relid = %u"
-							 "   AND p.pubname IN ( %s ) AND pr.prattrs IS NOT NULL",
-							 lrel->remoteid,
-							 lrel->remoteid,
-							 pub_names.data);
-
-			pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
-								 lengthof(attrsRow), attrsRow);
-
-			if (pubres->status != WALRCV_OK_TUPLES)
-				ereport(ERROR,
-						(errcode(ERRCODE_CONNECTION_FAILURE),
-						 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
-								nspname, relname, pubres->err)));
+			Datum	cfval = slot_getattr(slot, 1, &isnull);
 
-			/*
-			 * Multiple column list expressions for the same table will be combined
-			 * by merging them. If any of the lists for this table are null, it
-			 * means the whole table will be copied. In this case it is not necessary
-			 * to construct a unified column list expression at all.
-			 */
-			slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
-			while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+			/* if there's no column list, we need to replicate all columns */
+			if (isnull)
 			{
-				Datum	cfval = slot_getattr(slot, 1, &isnull);
-
-				/* if there's no column list, we need to replicate all columns */
-				if (isnull)
-				{
-					bms_free(included_cols);
-					included_cols = NULL;
-					break;
-				}
-
-				included_cols = bms_add_member(included_cols,
-											   DatumGetInt16(cfval));
-
-				ExecClearTuple(slot);
+				bms_free(included_cols);
+				included_cols = NULL;
+				break;
 			}
-			ExecDropSingleTupleTableSlot(slot);
+			else
+				included_cols = bms_add_member(included_cols,
+											DatumGetInt16(cfval));
 
-			walrcv_clear_result(pubres);
+			ExecClearTuple(slot);
 		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(pubres);
 
 		pfree(pub_names.data);
 	}
-- 
2.18.4

