From 711e05c4ae42d415fc13e2a6983a05fb8eeec154 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sun, 14 Sep 2025 12:13:40 +0530
Subject: [PATCH v1 2/2] Create conflict history table if it does not exist

---
 src/backend/commands/subscriptioncmds.c | 83 +++++++++++++++++++++----
 1 file changed, 70 insertions(+), 13 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c2f2fdabadb..f919d357a7e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -123,7 +123,9 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
 								bool slot_needs_update, bool isTopLevel);
-static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel);
+static void CreateConflictHistoryTable(Oid namespaceId, char *conflictrel);
+static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel,
+										 Oid relid);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -576,6 +578,54 @@ publicationListToArray(List *publist)
 	return PointerGetDatum(arr);
 }
 
+/*
+ * CreateConflictHistoryTable: Create conflict log history table.
+ *
+ * The subscription creator becomes the owner of this table and has all
+ * privileges on it.
+ */
+static void
+CreateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+{
+	StringInfoData 	querybuf;
+
+	initStringInfo(&querybuf);
+
+	/*
+	 * Build and execute the CREATE TABLE query.
+	 */
+	appendStringInfo(&querybuf,
+					 "CREATE TABLE %s.%s ("
+					 "relid	Oid,"						/* Oid of relation */
+					 "local_xid xid,"	/* local xid at the time of conflict */
+					 "remote_xid xid,"	/* remote node xid that produced the conflicting change */
+					 "local_lsn pg_lsn,"	/* local lsn at the time of conflict */
+					 "remote_commit_lsn pg_lsn,"	/* commit lsn of the remote transaction */
+					 "local_commit_ts TIMESTAMPTZ,"	/* commit ts of the local tuple */
+					 "remote_commit_ts TIMESTAMPTZ,"	/* commit ts of the remote tuple */
+					 "table_schema	TEXT,"	/* name of the schema */
+					 "table_name	TEXT,"	/* name of the table */
+					 "conflict_type TEXT,"	/* conflict type */
+					 "local_origin	TEXT,"	/* origin of remote tuple */
+					 "remote_origin	TEXT,"	/* origin of remote tuple */
+					 "key_tuple		JSON,"	/* json representation of the key used for searching */
+					 "local_tuple	JSON,"	/* json representation of the local tuple */
+					 "remote_tuple	JSON)",	 /* json representation of the remote tuple */
+					 quote_identifier(get_namespace_name(namespaceId)),
+					 quote_identifier(conflictrel));
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	pfree(querybuf.data);
+}
+
 /*
  * ValidateConflictHistoryTable - Validate conflict history table
  *
@@ -583,7 +633,8 @@ publicationListToArray(List *publist)
  * conflict log history table.
  */
 static void
-ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel,
+							 Oid relid)
 {
 	Datum		value;
 	Relation	pg_attribute;
@@ -592,17 +643,9 @@ ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel)
 	ScanKeyData scankey;
 	SysScanDesc scan;
 	HeapTuple	atup;
-	Oid			relid;
 	int			attcnt = 0;
 	bool		tbl_ok = true;
 
-	relid = get_relname_relid(conflictrel, namespaceId);
-	if (!OidIsValid(relid))
-		ereport(ERROR,
-				errcode(ERRCODE_UNDEFINED_TABLE),
-				errmsg("relation \"%s.%s\" does not exist",
-						get_namespace_name(namespaceId), conflictrel));
-
 	/* log history table must be a regular realtion */
 	if (get_rel_relkind(relid) != RELKIND_RELATION)
 		ereport(ERROR,
@@ -959,8 +1002,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	/* If conflict log history table name is given than create the table. */
 	if (opts.conflicttable)
-		ValidateConflictHistoryTable(conflict_table_nspid,
-									 conflict_table);
+	{
+		Oid relid = get_relname_relid(conflict_table, conflict_table_nspid);
+
+		if (!OidIsValid(relid))
+			CreateConflictHistoryTable(conflict_table_nspid, conflict_table);
+		else
+			ValidateConflictHistoryTable(conflict_table_nspid,
+										 conflict_table, relid);
+	}
+
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
@@ -1753,6 +1804,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_TABLE))
 				{
 					Oid		nspid;
+					Oid		relid;
 					char   *relname = NULL;
 					List   *names =
 						stringToQualifiedNameList(opts.conflicttable, NULL);
@@ -1766,7 +1818,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subconflictnspid - 1] = true;
 					replaces[Anum_pg_subscription_subconflicttable - 1] = true;
 
-					ValidateConflictHistoryTable(nspid, relname);
+					relid = get_relname_relid(relname, nspid);
+
+					if (!OidIsValid(relid))
+						CreateConflictHistoryTable(nspid, relname);
+					else
+						ValidateConflictHistoryTable(nspid, relname, relid);
 				}
 
 				update_tuple = true;
-- 
2.49.0

