Em qua, 25 de set de 2019 às 08:08, Euler Taveira
<eu...@timbira.com.br> escreveu:
>
> I'll send a patchset later today.
>
... and it is attached.


-- 
   Euler Taveira                                   Timbira -
http://www.timbira.com.br/
   PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
From b5d4d1369dbb4e7ec20182507dc5ae920dd8d2e9 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 9 Mar 2018 18:39:22 +0000
Subject: [PATCH 1/8] Remove unused atttypmod column from initial table
 synchronization

 Since commit 7c4f52409a8c7d85ed169bbbc1f6092274d03920, atttypmod was
 added but not used. The removal is safe because COPY from publisher
 does not need such information.
---
 src/backend/replication/logical/tablesync.c | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 7881079..0a565dd 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -647,7 +647,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {OIDOID, CHAROID};
-	Oid			attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+	Oid			attrRow[3] = {TEXTOID, OIDOID, BOOLOID};
 	bool		isnull;
 	int			natt;
 
@@ -691,7 +691,6 @@ fetch_remote_table_info(char *nspname, char *relname,
 	appendStringInfo(&cmd,
 					 "SELECT a.attname,"
 					 "       a.atttypid,"
-					 "       a.atttypmod,"
 					 "       a.attnum = ANY(i.indkey)"
 					 "  FROM pg_catalog.pg_attribute a"
 					 "  LEFT JOIN pg_catalog.pg_index i"
@@ -703,7 +702,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 					 lrel->remoteid,
 					 (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
 					 lrel->remoteid);
-	res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, attrRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -724,7 +723,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		Assert(!isnull);
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
-		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
+		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
 		/* Should never happen. */
-- 
2.7.4

From 406b2dbe4df63a94364e548a67d085e255ea2644 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 9 Mar 2018 17:37:36 +0000
Subject: [PATCH 2/8] Store number of tuples in WalRcvExecResult

It seems to be a useful information while allocating memory for queries
that returns more than one row. It reduces memory allocation
for initial table synchronization.
---
 src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 5 +++--
 src/backend/replication/logical/tablesync.c                 | 5 ++---
 src/include/replication/walreceiver.h                       | 1 +
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6eba08a..343550a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -878,6 +878,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 				 errdetail("Expected %d fields, got %d fields.",
 						   nRetTypes, nfields)));
 
+	walres->ntuples = PQntuples(pgres);
 	walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
 
 	/* Create tuple descriptor corresponding to expected result. */
@@ -888,7 +889,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
 
 	/* No point in doing more here if there were no tuples returned. */
-	if (PQntuples(pgres) == 0)
+	if (walres->ntuples == 0)
 		return;
 
 	/* Create temporary context for local allocations. */
@@ -897,7 +898,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 									   ALLOCSET_DEFAULT_SIZES);
 
 	/* Process returned rows. */
-	for (tupn = 0; tupn < PQntuples(pgres); tupn++)
+	for (tupn = 0; tupn < walres->ntuples; tupn++)
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0a565dd..42db4ad 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -709,9 +709,8 @@ fetch_remote_table_info(char *nspname, char *relname,
 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
 						nspname, relname, res->err)));
 
-	/* We don't know the number of rows coming, so allocate enough space. */
-	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
-	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
+	lrel->attnames = palloc0(res->ntuples * sizeof(char *));
+	lrel->atttyps = palloc0(res->ntuples * sizeof(Oid));
 	lrel->attkeys = NULL;
 
 	natt = 0;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e12a934..0d32d59 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -196,6 +196,7 @@ typedef struct WalRcvExecResult
 	char	   *err;
 	Tuplestorestate *tuplestore;
 	TupleDesc	tupledesc;
+	int			ntuples;
 } WalRcvExecResult;
 
 /* libpqwalreceiver hooks */
-- 
2.7.4

From 428c6f3959b67627e9f1c92fdf38d71bb66163ef Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 24 Jan 2018 17:01:31 -0200
Subject: [PATCH 4/8] Rename a WHERE node

A WHERE clause will be used for row filtering in logical replication. We
already have a similar node: 'WHERE (condition here)'. Let's rename the
node to a generic name and use it for row filtering too.
---
 src/backend/parser/gram.y | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3f67aaf..21bef5c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -476,7 +476,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	def_arg columnElem where_clause where_or_current_clause
 				a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound
 				columnref in_expr having_clause func_table xmltable array_expr
-				ExclusionWhereClause operator_def_arg
+				OptWhereClause operator_def_arg
 %type <list>	rowsfrom_item rowsfrom_list opt_col_def_list
 %type <boolean> opt_ordinality
 %type <list>	ExclusionConstraintList ExclusionConstraintElem
@@ -3711,7 +3711,7 @@ ConstraintElem:
 					$$ = (Node *)n;
 				}
 			| EXCLUDE access_method_clause '(' ExclusionConstraintList ')'
-				opt_c_include opt_definition OptConsTableSpace  ExclusionWhereClause
+				opt_c_include opt_definition OptConsTableSpace  OptWhereClause
 				ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
@@ -3813,7 +3813,7 @@ ExclusionConstraintElem: index_elem WITH any_operator
 			}
 		;
 
-ExclusionWhereClause:
+OptWhereClause:
 			WHERE '(' a_expr ')'					{ $$ = $3; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
-- 
2.7.4

From 45231f2c46b61aabb0fcb4f938589a6c21aad2c5 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 02:21:03 +0000
Subject: [PATCH 3/8] Refactor function create_estate_for_relation

Relation localrel is the only LogicalRepRelMapEntry structure member
that is useful for create_estate_for_relation.
---
 src/backend/replication/logical/worker.c | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 11e6331..d9952c8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,7 +173,7 @@ ensure_transaction(void)
  * This is based on similar code in copy.c
  */
 static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel)
+create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
 	ResultRelInfo *resultRelInfo;
@@ -183,13 +183,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
 
 	rte = makeNode(RangeTblEntry);
 	rte->rtekind = RTE_RELATION;
-	rte->relid = RelationGetRelid(rel->localrel);
-	rte->relkind = rel->localrel->rd_rel->relkind;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
 	rte->rellockmode = AccessShareLock;
 	ExecInitRangeTable(estate, list_make1(rte));
 
 	resultRelInfo = makeNode(ResultRelInfo);
-	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
+	InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
 
 	estate->es_result_relations = resultRelInfo;
 	estate->es_num_result_relations = 1;
@@ -589,7 +589,7 @@ apply_handle_insert(StringInfo s)
 	}
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
@@ -696,7 +696,7 @@ apply_handle_update(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
@@ -815,7 +815,7 @@ apply_handle_delete(StringInfo s)
 	check_relation_updatable(rel);
 
 	/* Initialize the executor state. */
-	estate = create_estate_for_relation(rel);
+	estate = create_estate_for_relation(rel->localrel);
 	remoteslot = ExecInitExtraTupleSlot(estate,
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
-- 
2.7.4

From e14f1892427778b728c0a684e68d89ee172a4679 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Tue, 27 Feb 2018 04:03:13 +0000
Subject: [PATCH 5/8] Row filtering for logical replication

When you define or modify a publication you optionally can filter rows
to be published using a WHERE condition. This condition is any
expression that evaluates to boolean. Only those rows that
satisfy the WHERE condition will be sent to subscribers.
---
 doc/src/sgml/catalogs.sgml                  |   9 ++
 doc/src/sgml/ref/alter_publication.sgml     |  11 ++-
 doc/src/sgml/ref/create_publication.sgml    |  26 +++++-
 src/backend/catalog/pg_publication.c        | 102 ++++++++++++++++++++--
 src/backend/commands/publicationcmds.c      |  89 +++++++++++++------
 src/backend/parser/gram.y                   |  26 ++++--
 src/backend/parser/parse_agg.c              |  10 +++
 src/backend/parser/parse_expr.c             |  14 ++-
 src/backend/parser/parse_func.c             |   3 +
 src/backend/replication/logical/tablesync.c | 127 +++++++++++++++++++++++++---
 src/backend/replication/logical/worker.c    |   2 +-
 src/backend/replication/pgoutput/pgoutput.c | 101 +++++++++++++++++++++-
 src/include/catalog/pg_publication.h        |   9 +-
 src/include/catalog/pg_publication_rel.h    |   4 +
 src/include/catalog/toasting.h              |   1 +
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/parsenodes.h              |  11 ++-
 src/include/parser/parse_node.h             |   1 +
 src/include/replication/logicalrelation.h   |   2 +
 src/test/regress/expected/publication.out   |  29 +++++++
 src/test/regress/sql/publication.sql        |  21 +++++
 src/test/subscription/t/013_row_filter.pl   |  96 +++++++++++++++++++++
 22 files changed, 634 insertions(+), 61 deletions(-)
 create mode 100644 src/test/subscription/t/013_row_filter.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 5e71a2e..7f11225 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5595,6 +5595,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry>
       <entry>Reference to relation</entry>
      </row>
+
+     <row>
+      <entry><structfield>prqual</structfield></entry>
+      <entry><type>pg_node_tree</type></entry>
+      <entry></entry>
+      <entry>Expression tree (in the form of a
+      <function>nodeToString()</function> representation) for the relation's
+      qualifying condition</entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 534e598..9608448 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -21,8 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
-ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
@@ -91,7 +91,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">expression</replaceable> will
+      not be published. Note that parentheses are required around the
+      expression. The <replaceable class="parameter">expression</replaceable>
+      is executed with the role used for the replication connection.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index 99f87ca..6e99943 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
-    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
+    [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
       | FOR ALL TABLES ]
     [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 
@@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included. If the optional
+      <literal>WHERE</literal> clause is specified, rows that do not satisfy
+      the <replaceable class="parameter">expression</replaceable> will not be
+      published. Note that parentheses are required around the expression.
      </para>
 
      <para>
@@ -157,6 +160,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   </para>
 
   <para>
+  Columns used in the <literal>WHERE</literal> clause must be part of the
+  primary key or be covered by <literal>REPLICA IDENTITY</literal> otherwise
+  <command>UPDATE</command> and <command>DELETE</command> operations will not
+  be replicated.
+  </para>
+
+  <para>
    For an <command>INSERT ... ON CONFLICT</command> command, the publication will
    publish the operation that actually results from the command.  So depending
    of the outcome, it may be published as either <command>INSERT</command> or
@@ -171,6 +181,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   <para>
    <acronym>DDL</acronym> operations are not published.
   </para>
+
+  <para>
+  The <literal>WHERE</literal> clause expression is executed with the role used
+  for the replication connection.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -184,6 +199,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
   </para>
 
   <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
+  <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
 CREATE PUBLICATION alltables FOR ALL TABLES;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index fd5da7d..f5462dc 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -34,6 +34,10 @@
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -149,18 +153,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState *pstate;
+	RangeTblEntry *rte;
+	Node	   *whereclause;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -180,10 +187,27 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
+
+	/* Set up a pstate to parse with */
+	pstate = make_parsestate(NULL);
+	pstate->p_sourcetext = nodeToString(targetrel->whereClause);
+
+	rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
+										AccessShareLock,
+										NULL, false, false);
+	addRTEtoQuery(pstate, rte, false, true, true);
+
+	whereclause = transformWhereClause(pstate,
+									   copyObject(targetrel->whereClause),
+									   EXPR_KIND_PUBLICATION_WHERE,
+									   "PUBLICATION");
+
+	/* Fix up collation information */
+	assign_expr_collations(pstate, whereclause);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -197,6 +221,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add qualifications, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prqual - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -213,11 +243,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the qualifications */
+	if (whereclause)
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+
+	free_parsestate(pstate);
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
@@ -292,6 +328,62 @@ GetPublicationRelations(Oid pubid)
 }
 
 /*
+ * Gets list of PublicationRelationQuals for a publication.
+ */
+List *
+GetPublicationRelationQuals(Oid pubid)
+{
+	List	   *result;
+	Relation	pubrelsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the relation. */
+	pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_rel_prpubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+							  true, NULL, 1, &scankey);
+
+	result = NIL;
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_rel pubrel;
+		PublicationRelationQual *relqual;
+		Datum		value_datum;
+		char	   *qual_value;
+		Node	   *qual_expr;
+		bool		isnull;
+
+		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		value_datum = heap_getattr(tup, Anum_pg_publication_rel_prqual, RelationGetDescr(pubrelsrel), &isnull);
+		if (!isnull)
+		{
+			qual_value = TextDatumGetCString(value_datum);
+			qual_expr = (Node *) stringToNode(qual_value);
+		}
+		else
+			qual_expr = NULL;
+
+		relqual = palloc(sizeof(PublicationRelationQual));
+		relqual->relation = table_open(pubrel->prrelid, ShareUpdateExclusiveLock);
+		relqual->whereClause = copyObject(qual_expr);
+		result = lappend(result, relqual);
+	}
+
+	systable_endscan(scan);
+	heap_close(pubrelsrel, AccessShareLock);
+
+	return result;
+}
+
+/*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
 List *
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f115d4b..2606377 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -352,6 +352,28 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	/*
+	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
+	 * publication_table_list node (that accepts a WHERE clause) but forbid
+	 * the WHERE clause in it.  The use of relation_expr_list node just for
+	 * the DROP TABLE part does not worth the trouble.
+	 */
+	if (stmt->tableAction == DEFELEM_DROP)
+	{
+		ListCell	*lc;
+
+		foreach(lc, stmt->tables)
+		{
+			PublicationTable *t = lfirst(lc);
+
+			if (t->whereClause)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
+								NameStr(pubform->pubname))));
+		}
+	}
+
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
@@ -360,47 +382,56 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 		PublicationDropTables(pubid, rels, false);
 	else						/* DEFELEM_SET */
 	{
-		List	   *oldrelids = GetPublicationRelations(pubid);
+		List	   *oldrels = GetPublicationRelationQuals(pubid);
 		List	   *delrels = NIL;
 		ListCell   *oldlc;
 
 		/* Calculate which relations to drop. */
-		foreach(oldlc, oldrelids)
+		foreach(oldlc, oldrels)
 		{
-			Oid			oldrelid = lfirst_oid(oldlc);
+			PublicationRelationQual *oldrel = lfirst(oldlc);
+			PublicationRelationQual *newrel;
 			ListCell   *newlc;
 			bool		found = false;
 
 			foreach(newlc, rels)
 			{
-				Relation	newrel = (Relation) lfirst(newlc);
+				newrel = (PublicationRelationQual *) lfirst(newlc);
 
-				if (RelationGetRelid(newrel) == oldrelid)
+				if (RelationGetRelid(newrel->relation) == RelationGetRelid(oldrel->relation))
 				{
 					found = true;
 					break;
 				}
 			}
 
-			if (!found)
+			/*
+			 * Remove publication / relation mapping iif (i) table is not
+			 * found in the new list or (ii) table is found in the new list,
+			 * however, its qual does not match the old one (in this case, a
+			 * simple tuple update is not enough because of the dependencies).
+			 */
+			if (!found || (found && !equal(oldrel->whereClause, newrel->whereClause)))
 			{
-				Relation	oldrel = table_open(oldrelid,
-												ShareUpdateExclusiveLock);
+				PublicationRelationQual *oldrelqual = palloc(sizeof(PublicationRelationQual));
 
-				delrels = lappend(delrels, oldrel);
+				oldrelqual->relation = table_open(RelationGetRelid(oldrel->relation),
+												  ShareUpdateExclusiveLock);
+
+				delrels = lappend(delrels, oldrelqual);
 			}
 		}
 
 		/* And drop them. */
 		PublicationDropTables(pubid, delrels, true);
+		CloseTableList(oldrels);
+		CloseTableList(delrels);
 
 		/*
 		 * Don't bother calculating the difference for adding, we'll catch and
 		 * skip existing ones when doing catalog update.
 		 */
 		PublicationAddTables(pubid, rels, true, stmt);
-
-		CloseTableList(delrels);
 	}
 
 	CloseTableList(rels);
@@ -510,13 +541,15 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	PublicationRelationQual *relqual;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
+		PublicationTable *t = lfirst(lc);
+		RangeVar   *rv = castNode(RangeVar, t->relation);
 		bool		recurse = rv->inh;
 		Relation	rel;
 		Oid			myrelid;
@@ -539,8 +572,10 @@ OpenTableList(List *tables)
 			table_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
-
-		rels = lappend(rels, rel);
+		relqual = palloc(sizeof(PublicationRelationQual));
+		relqual->relation = rel;
+		relqual->whereClause = t->whereClause;
+		rels = lappend(rels, relqual);
 		relids = lappend_oid(relids, myrelid);
 
 		/* Add children of this rel, if requested */
@@ -568,7 +603,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				relqual = palloc(sizeof(PublicationRelationQual));
+				relqual->relation = rel;
+				/* child inherits WHERE clause from parent */
+				relqual->whereClause = t->whereClause;
+				rels = lappend(rels, relqual);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -589,10 +628,12 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
 
-		table_close(rel, NoLock);
+		table_close(rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -608,13 +649,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
-		if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
-			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
-						   RelationGetRelationName(rel));
+		if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
+						   RelationGetRelationName(rel->relation));
 
 		obj = publication_add_relation(pubid, rel, if_not_exists);
 		if (stmt)
@@ -640,8 +681,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
-		Oid			relid = RelationGetRelid(rel);
+		PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
+		Oid			relid = RelationGetRelid(rel->relation);
 
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
@@ -654,7 +695,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("relation \"%s\" is not part of the publication",
-							RelationGetRelationName(rel))));
+							RelationGetRelationName(rel->relation))));
 		}
 
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 21bef5c..8cad2bc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -404,13 +404,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				relation_expr_list dostmt_opt_list
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
-				publication_name_list
+				publication_name_list publication_table_list
 				vacuum_relation_list opt_vacuum_relation_list
 
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table_elem
 %type <value>	publication_name_item
 
 %type <list>	opt_fdw_options fdw_options
@@ -9547,7 +9547,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9578,7 +9578,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9586,7 +9586,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_ADD;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name SET TABLE relation_expr_list
+			| ALTER PUBLICATION name SET TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9594,7 +9594,7 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_SET;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name DROP TABLE relation_expr_list
+			| ALTER PUBLICATION name DROP TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
@@ -9604,6 +9604,20 @@ AlterPublicationStmt:
 				}
 		;
 
+publication_table_list:
+			publication_table_elem									{ $$ = list_make1($1); }
+			| publication_table_list ',' publication_table_elem		{ $$ = lappend($1, $3); }
+		;
+
+publication_table_elem: relation_expr OptWhereClause
+				{
+					PublicationTable *n = makeNode(PublicationTable);
+					n->relation = $1;
+					n->whereClause = $2;
+					$$ = (Node *) n;
+				}
+		;
+
 /*****************************************************************************
  *
  * CREATE SUBSCRIPTION name ...
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index f418c61..dea5aad 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -544,6 +544,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in COPY FROM WHERE conditions");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE expressions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE expressions");
+
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -933,6 +940,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("window functions are not allowed in column generation expressions");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 76f3dd7..6d2c6a2 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -170,6 +170,13 @@ transformExprRecurse(ParseState *pstate, Node *expr)
 	/* Guard against stack overflow due to overly complex expressions */
 	check_stack_depth();
 
+	/* Functions are not allowed in publication WHERE clauses */
+	if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && nodeTag(expr) == T_FuncCall)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("functions are not allowed in WHERE"),
+				 parser_errposition(pstate, exprLocation(expr))));
+
 	switch (nodeTag(expr))
 	{
 		case T_ColumnRef:
@@ -571,6 +578,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref)
 		case EXPR_KIND_CALL_ARGUMENT:
 		case EXPR_KIND_COPY_WHERE:
 		case EXPR_KIND_GENERATED_COLUMN:
+		case EXPR_KIND_PUBLICATION_WHERE:
 			/* okay */
 			break;
 
@@ -1924,13 +1932,15 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 			break;
 		case EXPR_KIND_CALL_ARGUMENT:
 			err = _("cannot use subquery in CALL argument");
-			break;
 		case EXPR_KIND_COPY_WHERE:
 			err = _("cannot use subquery in COPY FROM WHERE condition");
 			break;
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("cannot use subquery in column generation expression");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE expression");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -3561,6 +3571,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "WHERE";
 		case EXPR_KIND_GENERATED_COLUMN:
 			return "GENERATED AS";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication expression";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 8e92653..66458d8 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2516,6 +2516,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("set-returning functions are not allowed in column generation expressions");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 42db4ad..d3999b1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -637,19 +637,26 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 /*
  * Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in COPY.
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel)
+						LogicalRepRelation *lrel, List **qual)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {OIDOID, CHAROID};
 	Oid			attrRow[3] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			qualRow[1] = {TEXTOID};
 	bool		isnull;
-	int			natt;
+	int			n;
+	ListCell   *lc;
+	bool		first;
+
+	/* Avoid trashing relation map cache */
+	memset(lrel, 0, sizeof(LogicalRepRelation));
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -713,20 +720,20 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->atttyps = palloc0(res->ntuples * sizeof(Oid));
 	lrel->attkeys = NULL;
 
-	natt = 0;
+	n = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
+		lrel->attnames[n] =
 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+		lrel->atttyps[n] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
-			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+			lrel->attkeys = bms_add_member(lrel->attkeys, n);
 
 		/* Should never happen. */
-		if (++natt >= MaxTupleAttributeNumber)
+		if (++n >= MaxTupleAttributeNumber)
 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
 				 nspname, relname);
 
@@ -734,7 +741,52 @@ fetch_remote_table_info(char *nspname, char *relname,
 	}
 	ExecDropSingleTupleTableSlot(slot);
 
-	lrel->natts = natt;
+	lrel->natts = n;
+
+	walrcv_clear_result(res);
+
+	/* Get relation qual */
+	resetStringInfo(&cmd);
+	appendStringInfo(&cmd,
+						"SELECT pg_get_expr(prqual, prrelid) "
+						"  FROM pg_publication p "
+						"  INNER JOIN pg_publication_rel pr "
+						"       ON (p.oid = pr.prpubid) "
+						" WHERE pr.prrelid = %u "
+						"   AND p.pubname IN (", lrel->remoteid);
+
+	first = true;
+	foreach(lc, MySubscription->publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 1, qualRow);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s",
+						nspname, relname, res->err)));
+
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		Datum		rf = slot_getattr(slot, 1, &isnull);
+
+		if (!isnull)
+			*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+
+		ExecClearTuple(slot);
+	}
+	ExecDropSingleTupleTableSlot(slot);
 
 	walrcv_clear_result(res);
 	pfree(cmd.data);
@@ -750,6 +802,7 @@ copy_table(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
+	List	   *qual = NIL;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyState	cstate;
@@ -758,7 +811,7 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel);
+							RelationGetRelationName(rel), &lrel, &qual);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -767,10 +820,59 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	/* list of columns for COPY */
+	attnamelist = make_copy_attnamelist(relmapentry);
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "COPY %s TO STDOUT",
-					 quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+	/*
+	 * If publication has any row filter, build a SELECT query with OR'ed row
+	 * filters for COPY. If no row filters are available, use COPY for all
+	 * table contents.
+	 */
+	if (list_length(qual) > 0)
+	{
+		ListCell   *lc;
+		bool		first;
+
+		appendStringInfoString(&cmd, "COPY (SELECT ");
+		/* list of attribute names */
+		first = true;
+		foreach(lc, attnamelist)
+		{
+			char	   *col = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+			appendStringInfo(&cmd, "%s", quote_identifier(col));
+		}
+		appendStringInfo(&cmd, " FROM ONLY %s",
+						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		appendStringInfoString(&cmd, " WHERE ");
+		/* list of OR'ed filters */
+		first = true;
+		foreach(lc, qual)
+		{
+			char	   *q = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, " OR ");
+			appendStringInfo(&cmd, "%s", q);
+		}
+
+		appendStringInfoString(&cmd, ") TO STDOUT");
+		list_free_deep(qual);
+	}
+	else
+	{
+		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+	}
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -785,7 +887,6 @@ copy_table(Relation rel)
 	addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
 								  NULL, false, false);
 
-	attnamelist = make_copy_attnamelist(relmapentry);
 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
 
 	/* Do the copy */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9952c8..cef0c52 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -172,7 +172,7 @@ ensure_transaction(void)
  *
  * This is based on similar code in copy.c
  */
-static EState *
+EState *
 create_estate_for_relation(Relation rel)
 {
 	EState	   *estate;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757..63596e2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,15 +12,26 @@
  */
 #include "postgres.h"
 
+#include "catalog/pg_type.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+
+#include "executor/executor.h"
+#include "nodes/execnodes.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/planner.h"
+#include "optimizer/optimizer.h"
+#include "parser/parse_coerce.h"
 
 #include "fmgr.h"
 
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 
+#include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
 #include "utils/memutils.h"
@@ -60,6 +71,7 @@ typedef struct RelationSyncEntry
 	bool		schema_sent;	/* did we send the schema? */
 	bool		replicate_valid;
 	PublicationActions pubactions;
+	List	   *qual;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -335,6 +347,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* ... then check row filter */
+	if (list_length(relentry->qual) > 0)
+	{
+		HeapTuple	old_tuple;
+		HeapTuple	new_tuple;
+		TupleDesc	tupdesc;
+		EState	   *estate;
+		ExprContext *ecxt;
+		MemoryContext oldcxt;
+		ListCell   *lc;
+		bool		matched = true;
+
+		old_tuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
+		new_tuple = change->data.tp.newtuple ? &change->data.tp.newtuple->tuple : NULL;
+		tupdesc = RelationGetDescr(relation);
+		estate = create_estate_for_relation(relation);
+
+		/* prepare context per tuple */
+		ecxt = GetPerTupleExprContext(estate);
+		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
+		ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc, &TTSOpsHeapTuple);
+
+		ExecStoreHeapTuple(new_tuple ? new_tuple : old_tuple, ecxt->ecxt_scantuple, false);
+
+		foreach(lc, relentry->qual)
+		{
+			Node	   *qual;
+			ExprState  *expr_state;
+			Expr	   *expr;
+			Oid			expr_type;
+			Datum		res;
+			bool		isnull;
+
+			qual = (Node *) lfirst(lc);
+
+			/* evaluates row filter */
+			expr_type = exprType(qual);
+			expr = (Expr *) coerce_to_target_type(NULL, qual, expr_type, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+			expr = expression_planner(expr);
+			expr_state = ExecInitExpr(expr, NULL);
+			res = ExecEvalExpr(expr_state, ecxt, &isnull);
+
+			/* if tuple does not match row filter, bail out */
+			if (!DatumGetBool(res) || isnull)
+			{
+				matched = false;
+				break;
+			}
+		}
+
+		MemoryContextSwitchTo(oldcxt);
+
+		ExecDropSingleTupleTableSlot(ecxt->ecxt_scantuple);
+		FreeExecutorState(estate);
+
+		if (!matched)
+			return;
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -570,10 +641,14 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		 */
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->qual = NIL;
 
 		foreach(lc, data->publications)
 		{
 			Publication *pub = lfirst(lc);
+			HeapTuple	rf_tuple;
+			Datum		rf_datum;
+			bool		rf_isnull;
 
 			if (pub->alltables || list_member_oid(pubids, pub->oid))
 			{
@@ -583,9 +658,24 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
+			/* Cache row filters, if available */
+			rf_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rf_tuple))
+			{
+				rf_datum = SysCacheGetAttr(PUBLICATIONRELMAP, rf_tuple, Anum_pg_publication_rel_prqual, &rf_isnull);
+
+				if (!rf_isnull)
+				{
+					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					char	   *s = TextDatumGetCString(rf_datum);
+					Node	   *rf_node = stringToNode(s);
+
+					entry->qual = lappend(entry->qual, rf_node);
+					MemoryContextSwitchTo(oldctx);
+				}
+
+				ReleaseSysCache(rf_tuple);
+			}
 		}
 
 		list_free(pubids);
@@ -660,5 +750,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
 		entry->replicate_valid = false;
+		if (list_length(entry->qual) > 0)
+			list_free_deep(entry->qual);
+		entry->qual = NIL;
+	}
 }
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 20a2f0a..5261666 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -78,15 +78,22 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationQual
+{
+	Relation	relation;
+	Node	   *whereClause;
+} PublicationRelationQual;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
 extern List *GetPublicationRelations(Oid pubid);
+extern List *GetPublicationRelationQuals(Oid pubid);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
 											  bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index 5f5bc92..7fd5915 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid;		/* Oid of the publication */
 	Oid			prrelid;		/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN			/* variable-length fields start here */
+	pg_node_tree prqual;		/* qualifications */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/catalog/toasting.h b/src/include/catalog/toasting.h
index cc5dfed..d57ca82 100644
--- a/src/include/catalog/toasting.h
+++ b/src/include/catalog/toasting.h
@@ -66,6 +66,7 @@ DECLARE_TOAST(pg_namespace, 4163, 4164);
 DECLARE_TOAST(pg_partitioned_table, 4165, 4166);
 DECLARE_TOAST(pg_policy, 4167, 4168);
 DECLARE_TOAST(pg_proc, 2836, 2837);
+DECLARE_TOAST(pg_publication_rel, 8287, 8288);
 DECLARE_TOAST(pg_rewrite, 2838, 2839);
 DECLARE_TOAST(pg_seclabel, 3598, 3599);
 DECLARE_TOAST(pg_statistic, 2840, 2841);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index bce2d59..52522d0 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -477,6 +477,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index d93a79a..ca9920c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3474,12 +3474,19 @@ typedef struct AlterTSConfigurationStmt
 } AlterTSConfigurationStmt;
 
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar   *relation;		/* relation to be published */
+	Node	   *whereClause;	/* qualifications */
+} PublicationTable;
+
 typedef struct CreatePublicationStmt
 {
 	NodeTag		type;
 	char	   *pubname;		/* Name of the publication */
 	List	   *options;		/* List of DefElem nodes */
-	List	   *tables;			/* Optional list of tables to add */
+	List	   *tables;			/* Optional list of PublicationTable to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
 } CreatePublicationStmt;
 
@@ -3492,7 +3499,7 @@ typedef struct AlterPublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 
 	/* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
-	List	   *tables;			/* List of tables to add/drop */
+	List	   *tables;			/* List of PublicationTable to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
 } AlterPublicationStmt;
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 7c099e7..a5c9109 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -73,6 +73,7 @@ typedef enum ParseExprKind
 	EXPR_KIND_CALL_ARGUMENT,	/* procedure argument in CALL */
 	EXPR_KIND_COPY_WHERE,		/* WHERE condition in COPY FROM */
 	EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */
+	EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 2642a3f..5cc307e 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -39,4 +39,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
 extern char *logicalrep_typmap_gettypname(Oid remoteid);
 
+extern EState *create_estate_for_relation(Relation rel);
+
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index feb51e4..202173c 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -116,6 +116,35 @@ Tables:
 
 DROP TABLE testpub_tbl3, testpub_tbl3a;
 DROP PUBLICATION testpub3, testpub4;
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+ERROR:  functions are not allowed in WHERE
+LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ...
+                                                             ^
+\dRp+ testpub5
+                              Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
+Tables:
+    "public.testpub_rf_tbl3"  WHERE ((e > 300) AND (e < 500))
+
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP PUBLICATION testpub5;
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 ERROR:  "testpub_view" is not a table
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 5773a75..6f0d088 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -69,6 +69,27 @@ RESET client_min_messages;
 DROP TABLE testpub_tbl3, testpub_tbl3a;
 DROP PUBLICATION testpub3, testpub4;
 
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+\dRp+ testpub5
+
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP PUBLICATION testpub5;
+
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 SET client_min_messages = 'ERROR';
diff --git a/src/test/subscription/t/013_row_filter.pl b/src/test/subscription/t/013_row_filter.pl
new file mode 100644
index 0000000..99e6db9
--- /dev/null
+++ b/src/test/subscription/t/013_row_filter.pl
@@ -0,0 +1,96 @@
+# Test logical replication behavior with row filtering
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+
+# setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+
+# setup logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')");
+
+my $result = $node_publisher->psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 DROP TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')");
+is($result, 3, "syntax error for ALTER PUBLICATION DROP TABLE");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)");
+
+# test row filtering
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 10)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+#$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab_rowfilter_1");
+is($result, qq(1980|not filtered
+1001|test 1001
+1002|test 1002), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(7|2|10), 'check filtered data was copied to subscriber');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(a) FROM tab_rowfilter_3");
+is($result, qq(10), 'check filtered data was copied to subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.7.4

From a5ca3aa97cd50b796f58fe60e8dbc9ed196aac9b Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Thu, 17 May 2018 20:52:28 +0000
Subject: [PATCH 6/8] Print publication WHERE condition in psql

---
 src/bin/psql/describe.c | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index d7c0fc0..76404e0 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5876,7 +5876,8 @@ describePublications(const char *pattern)
 		if (!puballtables)
 		{
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
+							  "SELECT n.nspname, c.relname,\n"
+							  "  pg_get_expr(pr.prqual, c.oid)\n"
 							  "FROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
@@ -5906,6 +5907,10 @@ describePublications(const char *pattern)
 								  PQgetvalue(tabres, j, 0),
 								  PQgetvalue(tabres, j, 1));
 
+				if (!PQgetisnull(tabres, j, 2))
+					appendPQExpBuffer(&buf, "  WHERE %s",
+									  PQgetvalue(tabres, j, 2));
+
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(tabres);
-- 
2.7.4

From 90f046605051e79739372339ffeae982fd45e328 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Wed, 14 Mar 2018 00:53:17 +0000
Subject: [PATCH 8/8] Debug for row filtering

---
 src/backend/commands/publicationcmds.c      | 11 +++++
 src/backend/replication/logical/tablesync.c |  1 +
 src/backend/replication/pgoutput/pgoutput.c | 66 +++++++++++++++++++++++++++++
 3 files changed, 78 insertions(+)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 2606377..b2378c6 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -341,6 +341,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	List	   *rels = NIL;
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 	Oid			pubid = pubform->oid;
+	ListCell	*lc;
 
 	/* Check that user is allowed to manipulate the publication tables. */
 	if (pubform->puballtables)
@@ -352,6 +353,16 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 
 	Assert(list_length(stmt->tables) > 0);
 
+	foreach(lc, stmt->tables)
+	{
+		PublicationTable *t = lfirst(lc);
+
+		if (t->whereClause == NULL)
+			elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname));
+		else
+			elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname));
+	}
+
 	/*
 	 * ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause.  Use
 	 * publication_table_list node (that accepts a WHERE clause) but forbid
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d3999b1..2f586c0 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -873,6 +873,7 @@ copy_table(Relation rel)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	}
+	elog(DEBUG2, "COPY for initial synchronization: %s", cmd.data);
 	res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 63596e2..6306bc3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -34,6 +34,7 @@
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/int8.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -323,6 +324,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 
+	Form_pg_class	class_form;
+	char			*schemaname;
+	char			*tablename;
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -347,6 +352,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	class_form = RelationGetForm(relation);
+	schemaname = get_namespace_name(class_form->relnamespace);
+	tablename = NameStr(class_form->relname);
+
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+		elog(DEBUG1, "INSERT \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
+		elog(DEBUG1, "UPDATE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+	else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
+		elog(DEBUG1, "DELETE \"%s\".\"%s\" txid: %u", schemaname, tablename, txn->xid);
+
 	/* ... then check row filter */
 	if (list_length(relentry->qual) > 0)
 	{
@@ -364,6 +380,42 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		tupdesc = RelationGetDescr(relation);
 		estate = create_estate_for_relation(relation);
 
+#ifdef	_NOT_USED
+		if (old_tuple)
+		{
+			int i;
+
+			for (i = 0; i < tupdesc->natts; i++)
+			{
+				Form_pg_attribute	attr;
+				HeapTuple			type_tuple;
+				Oid					typoutput;
+				bool				typisvarlena;
+				bool				isnull;
+				Datum				val;
+				char				*outputstr = NULL;
+
+				attr = TupleDescAttr(tupdesc, i);
+
+				/* Figure out type name */
+				type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(attr->atttypid));
+				if (HeapTupleIsValid(type_tuple))
+				{
+					/* Get information needed for printing values of a type */
+					getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
+
+					val = heap_getattr(old_tuple, i + 1, tupdesc, &isnull);
+					if (!isnull)
+					{
+						outputstr = OidOutputFunctionCall(typoutput, val);
+						elog(DEBUG2, "row filter: REPLICA IDENTITY %s: %s", NameStr(attr->attname), outputstr);
+						pfree(outputstr);
+					}
+				}
+			}
+		}
+#endif
+
 		/* prepare context per tuple */
 		ecxt = GetPerTupleExprContext(estate);
 		oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -379,6 +431,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Oid			expr_type;
 			Datum		res;
 			bool		isnull;
+			char		*s = NULL;
 
 			qual = (Node *) lfirst(lc);
 
@@ -389,12 +442,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			expr_state = ExecInitExpr(expr, NULL);
 			res = ExecEvalExpr(expr_state, ecxt, &isnull);
 
+			elog(DEBUG3, "row filter: result: %s ; isnull: %s", (DatumGetBool(res)) ? "true" : "false", (isnull) ? "true" : "false");
+
 			/* if tuple does not match row filter, bail out */
 			if (!DatumGetBool(res) || isnull)
 			{
+				s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(qual)), ObjectIdGetDatum(relentry->relid)));
+				elog(DEBUG2, "row filter \"%s\" was not matched", s);
+				pfree(s);
+
 				matched = false;
 				break;
 			}
+
+			s = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, CStringGetTextDatum(nodeToString(qual)), ObjectIdGetDatum(relentry->relid)));
+			elog(DEBUG2, "row filter \"%s\" was matched", s);
+			pfree(s);
 		}
 
 		MemoryContextSwitchTo(oldcxt);
@@ -668,10 +731,13 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				{
 					MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 					char	   *s = TextDatumGetCString(rf_datum);
+					char	   *t = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, rf_datum, ObjectIdGetDatum(entry->relid)));
 					Node	   *rf_node = stringToNode(s);
 
 					entry->qual = lappend(entry->qual, rf_node);
 					MemoryContextSwitchTo(oldctx);
+
+					elog(DEBUG2, "row filter \"%s\" found for publication \"%s\" and relation \"%s\"", t, pub->name, get_rel_name(relid));
 				}
 
 				ReleaseSysCache(rf_tuple);
-- 
2.7.4

From 3897f998a328fbd42824fe265a15e76ec1247703 Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Sat, 15 Sep 2018 02:52:00 +0000
Subject: [PATCH 7/8] Publication where condition support for pg_dump

---
 src/bin/pg_dump/pg_dump.c | 15 +++++++++++++--
 src/bin/pg_dump/pg_dump.h |  1 +
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f01fea5..3c37134 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3959,6 +3959,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_tableoid;
 	int			i_oid;
 	int			i_pubname;
+	int			i_pubrelqual;
 	int			i,
 				j,
 				ntups;
@@ -3991,7 +3992,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 		/* Get the publication membership for the table. */
 		appendPQExpBuffer(query,
-						  "SELECT pr.tableoid, pr.oid, p.pubname "
+						  "SELECT pr.tableoid, pr.oid, p.pubname, "
+						  "pg_catalog.pg_get_expr(pr.prqual, pr.prrelid) AS pubrelqual "
 						  "FROM pg_publication_rel pr, pg_publication p "
 						  "WHERE pr.prrelid = '%u'"
 						  "  AND p.oid = pr.prpubid",
@@ -4012,6 +4014,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		i_tableoid = PQfnumber(res, "tableoid");
 		i_oid = PQfnumber(res, "oid");
 		i_pubname = PQfnumber(res, "pubname");
+		i_pubrelqual = PQfnumber(res, "pubrelqual");
 
 		pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
 
@@ -4027,6 +4030,11 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 			pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
 			pubrinfo[j].pubtable = tbinfo;
 
+			if (PQgetisnull(res, j, i_pubrelqual))
+				pubrinfo[j].pubrelqual = NULL;
+			else
+				pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, j, i_pubrelqual));
+
 			/* Decide whether we want to dump it */
 			selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
 		}
@@ -4055,8 +4063,11 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
 
 	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
 					  fmtId(pubrinfo->pubname));
-	appendPQExpBuffer(query, " %s;\n",
+	appendPQExpBuffer(query, " %s",
 					  fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrelqual)
+		appendPQExpBuffer(query, " WHERE %s", pubrinfo->pubrelqual);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ec5a924..8d61faa 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -609,6 +609,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	TableInfo  *pubtable;
 	char	   *pubname;
+	char	   *pubrelqual;
 } PublicationRelInfo;
 
 /*
-- 
2.7.4

Reply via email to