From fb30e58b819e6c65736c35192c114b5239ccd01c Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler@timbira.com.br>
Date: Wed, 14 Mar 2018 00:53:17 +0000
Subject: [PATCH 8/8] Debug for row filtering

---
 src/backend/commands/publicationcmds.c      | 11 +++++
 src/backend/replication/logical/tablesync.c |  1 +
 src/backend/replication/pgoutput/pgoutput.c | 66 +++++++++++++++++++++++++++++
 3 files changed, 78 insertions(+)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f6a91a33f6..17167a23ab 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -337,6 +337,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	List	   *rels = NIL;
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 	Oid			pubid = pubform->oid;
+	ListCell	*lc;
 
 	/* Check that user is allowed to manipulate the publication tables. */
 	if (pubform->puballtables)
@@ -348,6 +349,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	foreach(lc, stmt->tables)
+	{
+		PublicationTable *t = lfirst(lc);
+
+		if (t->whereClause == NULL)
+			elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname));
+		else
+			elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname));
+	}
+
 	/*
 	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
 	 * publication_table_list node (that accepts a WHERE clause) but forbid
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 5d58d1cd66..3af9b2015c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -866,6 +866,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
+	elog(DEBUG2, "COPY for initial synchronization: %s", cmd.data);
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9500c5e52a..a57557d209 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -30,6 +30,7 @@
 #include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -319,6 +320,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 
+	Form_pg_class	class_form;
+	char			*schemaname;
+	char			*tablename;
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -343,6 +348,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	class_form = RelationGetForm(relation);
+	schemaname = get_namespace_name(class_form->relnamespace);
+	tablename = NameStr(class_form->relname);
+
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+		elog(DEBUG1, "INSERT \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
+		elog(DEBUG1, "UPDATE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
+		elog(DEBUG1, "DELETE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+
 	/* ... then check row filter */
 	if (list_length(relentry->qual) > 0)
 	{
@@ -360,6 +376,42 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		tupdesc = RelationGetDescr(relation);
 		estate = create_estate_for_relation(relation);
 
+#ifdef	_NOT_USED
+		if (old_tuple)
+		{
+			int i;
+
+			for (i = 0; i < tupdesc->natts; i++)
+			{
+				Form_pg_attribute	attr;
+				HeapTuple			type_tuple;
+				Oid					typoutput;
+				bool				typisvarlena;
+				bool				isnull;
+				Datum				val;
+				char				*outputstr = NULL;
+
+				attr = TupleDescAttr(tupdesc, i);
+
+				/* Figure out type name */
+				type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid));
+				if (HeapTupleIsValid(type_tuple))
+				{
+					/* Get information needed for printing values of a type */
+					getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
+
+					val = heap_getattr(old_tuple, i + 1, tupdesc, &isnull);
+					if (!isnull)
+					{
+						outputstr = OidOutputFunctionCall(typoutput, val);
+						elog(DEBUG2, "row filter: REPLICA IDENTITY %s: %s", NameStr(attr->attname), outputstr);
+						pfree(outputstr);
+					}
+				}
+			}
+		}
+#endif
+
 		/* prepare context per tuple */
 		ecxt = GetPerTupleExprContext(estate);
 		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -375,6 +427,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Oid			expr_type;
 			Datum		res;
 			bool		isnull;
+			char		*s = NULL;
 
 			qual = (Node *) lfirst(lc);
 
@@ -385,12 +438,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			expr_state = ExecInitExpr(expr, NULL);
 			res = ExecEvalExpr(expr_state, ecxt, &isnull);
 
+			elog(DEBUG3, "row filter: result: %s ; isnull: %s", (DatumGetBool(res)) ? "true" : "false", (isnull) ? "true" : "false");
+
 			/* if tuple does not match row filter, bail out */
 			if (!DatumGetBool(res) || isnull)
 			{
+				s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(qual)), ObjectIdGetDatum(relentry->relid)));
+				elog(DEBUG2, "row filter \"%s\" was not matched", s);
+				pfree(s);
+
 				matched = false;
 				break;
 			}
+
+			s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(qual)), ObjectIdGetDatum(relentry->relid)));
+			elog(DEBUG2, "row filter \"%s\" was matched", s);
+			pfree(s);
 		}
 
 		MemoryContextSwitchTo(oldcxt);
@@ -664,10 +727,13 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				{
 					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 					char	   *s = TextDatumGetCString(rf_datum);
+					char	   *t = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, rf_datum, ObjectIdGetDatum(entry->relid)));
 					Node	   *rf_node = stringToNode(s);
 
 					entry->qual = lappend(entry->qual, rf_node);
 					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", t, pub->name, get_rel_name(relid));
 				}
 
 				ReleaseSysCache(rf_tuple);
-- 
2.11.0

