On Mon, Jan 26, 2026 at 12:30 PM Masahiko Sawada <[email protected]> wrote: > > On Mon, Jan 19, 2026 at 9:44 AM Marcos Pegoraro <[email protected]> wrote: > > > > Em sex., 19 de dez. de 2025 às 22:59, Masahiko Sawada > > <[email protected]> escreveu: > >> > >> Yeah, if we pass a publication that a lot of tables belong to to > >> pg_get_publication_tables(), it could take a long time to return as it > >> needs to construct many entries. > > > > > > Well, I don't know how to help but I'm sure it's working badly. > > Today I added some fields on my server, then seeing logs I could see how > > slow this process is. > > > > duration: 2213.872 ms statement: SELECT DISTINCT (CASE WHEN > > (array_length(gpt.attrs, 1) = c.relnatts) THEN NULL ELSE gpt.attrs END) > > FROM pg_publication p, LATERAL pg_get_publication_tables(p.pubname) gpt, > > pg_class c WHERE gpt.relid = 274376788 AND c.oid = gpt.relid AND > > p.pubname IN ( 'mypub' ) > > > > 2 seconds to get the list of fields of a table is really too slow. > > How can we solve this ? > > After more investigation of slowness, it seems that the > list_concat_unique_oid() called below is quite slow when the database > has a lot of tables to publish: > > relids = GetPublicationRelations(pub_elem->oid, > pub_elem->pubviaroot ? > PUBLICATION_PART_ROOT : > PUBLICATION_PART_LEAF); > schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid, > pub_elem->pubviaroot ? > PUBLICATION_PART_ROOT : > PUBLICATION_PART_LEAF); > pub_elem_tables = list_concat_unique_oid(relids, schemarelids); > > This is simply because it's O(n^2), where n is the number of oids in > schemarelids in the test case. A simple change would be to do sort & > dedup instead. With the attached experimental patch, the > pg_get_publication_tables() execution time gets halved in my > environment (796ms -> 430ms with 50k tables). If the number of tables > is not large, this method might be slower than today but it's not a > huge regression. > > In the initial tablesync cases, it could be optimized further in a way > that we introduce a new SQL function that gets the column list and > expr of the specific table. This way, we can filter the result by > relid at an early stage instead of getting all information and > filtering by relid as the tablesync worker does today, avoiding > overheads of gathering system catalog scan results.
I've drafted this idea and I find it looks like a better approach. The patch introduces the pg_get_publication_table_info() SQL function that returns the column list and row filter expression like pg_get_publication_tables() returns but it checks only the specific table unlike pg_get_publication_tables(). On my env, the tablesync worker's query in question becomes 0.6ms from 288 ms with 50k tables in one publication. Feedback is very welcome. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com
From 54af2b794d741865fd06e97738b7fdb34e29b17e Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <[email protected]> Date: Wed, 25 Feb 2026 10:56:45 -0800 Subject: [PATCH] Add pg_get_publication_table_info() to optimize logical replication tablesync. --- src/backend/catalog/pg_publication.c | 222 +++++++++++++++++++- src/backend/replication/logical/tablesync.c | 9 +- src/include/catalog/pg_proc.dat | 9 + 3 files changed, 234 insertions(+), 6 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9a4791c573e..0a3015ffc91 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1116,6 +1116,111 @@ GetPublicationByName(const char *pubname, bool missing_ok) return OidIsValid(oid) ? GetPublication(oid) : NULL; } +/* + * pg_get_publication_tables() and pg_get_publication_table_info() use + * the same record type. + */ +#define NUM_PUBLICATION_TABLES_ELEM 4 + +/* + * Common routine for pg_get_publication_tables() and + * pg_get_publication_table_info() to construct the result tuple. + */ +static HeapTuple +construct_published_rel_tuple(published_rel *table_info, TupleDesc tuple_desc) +{ + Publication *pub; + Oid relid = table_info->relid; + Oid schemaid = get_rel_namespace(relid); + HeapTuple pubtuple = NULL; + Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0}; + bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0}; + + pub = GetPublication(table_info->pubid); + + values[0] = ObjectIdGetDatum(pub->oid); + values[1] = ObjectIdGetDatum(relid); + + values[0] = ObjectIdGetDatum(pub->oid); + values[1] = ObjectIdGetDatum(relid); + + /* + * We don't consider row filters or column lists for FOR ALL TABLES or + * FOR TABLES IN SCHEMA publications. + */ + if (!pub->alltables && + !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pub->oid))) + pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(pubtuple)) + { + /* Lookup the column list attribute. */ + values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + Anum_pg_publication_rel_prattrs, + &(nulls[2])); + + /* Null indicates no filter. */ + values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + Anum_pg_publication_rel_prqual, + &(nulls[3])); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + /* Show all columns when the column list is not specified. */ + if (nulls[2]) + { + Relation rel = table_open(relid, AccessShareLock); + int nattnums = 0; + int16 *attnums; + TupleDesc desc = RelationGetDescr(rel); + int i; + + attnums = palloc_array(int16, desc->natts); + + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + if (att->attgenerated) + { + /* We only support replication of STORED generated cols. */ + if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) + continue; + + /* + * User hasn't requested to replicate STORED generated + * cols. + */ + if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED) + continue; + } + + attnums[nattnums++] = att->attnum; + } + + if (nattnums > 0) + { + values[2] = PointerGetDatum(buildint2vector(attnums, nattnums)); + nulls[2] = false; + } + + table_close(rel, AccessShareLock); + } + + return heap_form_tuple(tuple_desc, values, nulls); +} + /* * Get information of the tables in the given publication array. * @@ -1124,7 +1229,6 @@ GetPublicationByName(const char *pubname, bool missing_ok) Datum pg_get_publication_tables(PG_FUNCTION_ARGS) { -#define NUM_PUBLICATION_TABLES_ELEM 4 FuncCallContext *funcctx; List *table_infos = NIL; @@ -1342,6 +1446,122 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } +/* + * Similar to pg_get_publication_tables(), but retrieves publication + * information only for the specified table. This function is useful for + * obtaining the column filter list and row filter expression for a specific + * table without processing all tables in a publication. It is significantly + * faster than pg_get_publication_tables() because it avoids constructing + * a list of all table OIDs. + */ +Datum +pg_get_publication_table_info(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + published_rel *table_info = NULL; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + Oid relid; + Name pubname; + Relation rel; + Publication *pub; + bool publish = false; + published_rel *pubrel = NULL; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + relid = PG_GETARG_OID(0); + pubname = PG_GETARG_NAME(1); + + rel = table_open(relid, AccessShareLock); + pub = GetPublicationByName(NameStr(*pubname), false); + + /* + * Verify that the specified table is published by the given + * publication. + */ + if (pub->alltables) + { + /* ALL TALBES publication */ + publish = true; + } + else if (!pub->pubviaroot && rel->rd_rel->relispartition) + { + List *ancestors = get_partition_ancestors(RelationGetRelid(rel)); + + /* + * Check if its ancestor is in the specified publication + * as publications with publish_via_partition_root being false + * create pg_publication_rel entries only for the top most + * partitioned table. + */ + if (OidIsValid(GetTopMostAncestorInPublication(pub->oid, ancestors, + NULL))) + publish = true; + } + else if (SearchSysCacheExists2(PUBLICATIONRELMAP, + ObjectIdGetDatum(RelationGetRelid(rel)), + ObjectIdGetDatum(pub->oid)) || + SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(RelationGetNamespace(rel)), + ObjectIdGetDatum(pub->oid))) + { + /* + * Looks for the entry in pg_publication_rel or + * pg_publication_namespace + */ + publish = true; + } + + table_close(rel, AccessShareLock); + + /* Construct a tuple descriptor for the result rows. */ + tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs", + INT2VECTOROID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual", + PG_NODE_TREEOID, -1, 0); + + if (publish) + { + pubrel = palloc_object(published_rel); + pubrel->relid = relid; + pubrel->pubid = pub->oid; + } + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + funcctx->user_fctx = pubrel; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + table_info = (published_rel *) funcctx->user_fctx; + + if (table_info && funcctx->call_cntr == 0) + { + HeapTuple rettuple; + + rettuple = construct_published_rel_tuple(table_info, funcctx->tuple_desc); + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple)); + } + + SRF_RETURN_DONE(funcctx); +} + /* * Returns Oids of sequences in a publication. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2f2f0121ecf..5331eb034b0 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -801,9 +801,9 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" " THEN NULL ELSE gpt.attrs END)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt," + " LATERAL pg_get_publication_table_info(%u, p.pubname) gpt," " pg_class c" - " WHERE gpt.relid = %u AND c.oid = gpt.relid" + " WHERE c.oid = gpt.relid" " AND p.pubname IN ( %s )", lrel->remoteid, pub_names->data); @@ -983,9 +983,8 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, appendStringInfo(&cmd, "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt" - " WHERE gpt.relid = %u" - " AND p.pubname IN ( %s )", + " LATERAL pg_get_publication_table_info(%u, p.pubname) gpt" + " WHERE p.pubname IN ( %s )", lrel->remoteid, pub_names->data); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index dac40992cbc..3cd6004d7dc 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12388,6 +12388,15 @@ proargmodes => '{v,o,o,o,o}', proargnames => '{pubname,pubid,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' }, +{ oid => '9761', + descr => 'get information of the table that is part of the specified publication', + proname => 'pg_get_publication_table_info', prorows => '1', + proretset => 't', provolatile => 's', + prorettype => 'record', proargtypes => 'oid name', + proallargtypes => '{oid,name,oid,oid,int2vector,pg_node_tree}', + proargmodes => '{i,i,o,o,o,o}', + proargnames => '{relid,pubname,pubid,relid,attrs,qual}', + prosrc => 'pg_get_publication_table_info' }, { oid => '8052', descr => 'get OIDs of sequences in a publication', proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', provolatile => 's', prorettype => 'oid', proargtypes => 'text', -- 2.53.0
