From 1e6115cb99a1286a61cb0a6a088f7476da29d0b9 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 29 Apr 2022 16:57:03 +0000
Subject: [PATCH 10/10] Fail replication worker on DDL command that rewrites
 table using volatile functions, such as ALTER TABLE tab ADD COLUMN col
 DEFAULT volatile_expr. This is to avoid data mismatch compared to the
 publisher. We can potentially unblock this type of command when table rewrite
 is supported in logical replication.

---
 src/backend/replication/logical/worker.c | 44 ++++++++++++++++++++++++
 src/test/subscription/t/030_rep_ddls.pl  | 22 ++++++++++++
 2 files changed, 66 insertions(+)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f5274531b7..93281cb88d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -157,6 +157,7 @@
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
 #include "parser/analyze.h"
+#include "parser/parse_expr.h"
 #include "pgstat.h"
 #include "postmaster/bgworker.h"
 #include "postmaster/interrupt.h"
@@ -2548,6 +2549,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
 
 		commandTag = CreateCommandTag((Node *)command);
 
+		/* The following DDL commands need special handling */
+
 		/*
 		 * Remember the schemaname and relname if the cmd is going to create a table
 		 * because we will need them for some post-processing after we
@@ -2595,6 +2598,47 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
 				sstmt->intoClause->skipData = true;
 			}
 		}
+		/*
+		 * ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported.
+		 * Until we support logical replication of table rewrite, see ATRewriteTables()
+		 * for details on table rewrite.
+		 */
+		else if (commandTag == CMDTAG_ALTER_TABLE)
+		{
+			AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt;
+			ListCell *lc;
+
+			foreach(lc, atstmt->cmds)
+			{
+				AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc);
+
+				if (cmd->subtype == AT_AddColumn)
+				{
+					ColumnDef *colDef;
+					ListCell *c;
+
+					colDef = castNode(ColumnDef, cmd->def);
+					foreach(c, colDef->constraints)
+					{
+						Constraint *con = lfirst_node(Constraint, c);
+
+						if (con->contype == CONSTR_DEFAULT)
+						{
+							Node *expr;
+							ParseState *pstate = make_parsestate(NULL);
+
+							expr = transformExpr(pstate, copyObject(con->raw_expr), EXPR_KIND_COLUMN_DEFAULT);
+							if (contain_volatile_functions(expr))
+							{
+								elog(ERROR,
+									"Do not support replication of DDL statement that rewrites table using volatile functions: %s",
+									cmdstr);
+							}
+						}
+					}
+				}
+			}
+		}
 
 		/*
 		 * Set up a snapshot if parse analysis/planning will need one.
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index 34b9d51eb1..b4df1bfefd 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -386,7 +386,29 @@ $node_publisher->wait_for_catchup('mysub');
 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.proc_table where c3 = 22;");
 is($result, qq(1), 'DDLs in procedure are replicated');
 
+# Test Alter table alter column type stmt is replicated
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ALTER COLUMN name type TEXT;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT data_type FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'name';");
+is($result, qq(text), 'Alter table column type stmt is replicated');
+
+# Test Alter table add column default 0.01 is replicated
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN non_volatile double precision DEFAULT 0.01;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep where non_volatile = 0.01;");
+is($result, qq(2), 'Alter table add column default 0.01 is replicated');
+
 #TODO TEST certain DDLs are not replicated
+# Test DDL statement that rewrites table with volatile functions are not replicated
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();");
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'volatile';");
+is($result, qq(1), 'Alter table add column default random() is executed on the publisher DB.');
+
+$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result);
 
 pass "DDL replication tests passed!";
 
-- 
2.32.0

