Hi Dharin, Attached is an updated patch addressing your review comments and fixing the concurrency model for the non-concurrent path.
Regarding the items you brought up: 1. Fixed the indnatts vs indnkeyatts issue for generating the ON CONFLICT target. 2. Removed the incorrect regression test comment regarding subqueries. Regarding the concurrency gap and safety model: In my last email, I mentioned planning to use transaction-level advisory locks to fix the consistency gap. After prototyping it, I had to abandon that approach. Testing revealed that it falls over at scale, quickly hitting `max_locks_per_transaction` limits and causing issues with bulk operations. I worked on this for a while before deciding it wasn't workable. Instead, I went a different direction. The non-concurrent partial refresh now uses a different two-step strategy: 1. It first executes a `SELECT FROM mv WHERE ... FOR UPDATE` to lock existing rows matching the predicate. This serializes concurrent partial refreshes on overlapping rows while allowing non-overlapping refreshes to proceed in parallel. 2. It then executes a single CTE that evaluates the underlying query, upserts the results into the matview, and deletes rows that no longer match the predicate via an anti-join. In my testing, this approach had similar performance to the original implementation, but hasn't exhibited the same correctness issues. Thanks, Adam Brusselback
From 2a2d7ba4a3c68d64717577e47e4dfd6f724e620a Mon Sep 17 00:00:00 2001 From: Adam Brusselback <[email protected]> Date: Thu, 9 Apr 2026 13:16:34 -0400 Subject: [PATCH v2] Add support for partial matview refresh using REFRESH MATERIALIZED VIEW ... WHERE ...` --- .../sgml/ref/refresh_materialized_view.sgml | 44 +- src/backend/commands/createas.c | 4 +- src/backend/commands/matview.c | 813 ++++++++++++++++-- src/backend/executor/execMain.c | 3 +- src/backend/parser/gram.y | 14 +- src/backend/tcop/utility.c | 2 +- src/include/commands/matview.h | 6 +- src/include/nodes/parsenodes.h | 1 + src/test/regress/expected/matview_where.out | 344 ++++++++ src/test/regress/parallel_schedule | 5 + src/test/regress/sql/matview_where.sql | 302 +++++++ 11 files changed, 1468 insertions(+), 70 deletions(-) create mode 100644 src/test/regress/expected/matview_where.out create mode 100644 src/test/regress/sql/matview_where.sql diff --git a/doc/src/sgml/ref/refresh_materialized_view.sgml b/doc/src/sgml/ref/refresh_materialized_view.sgml index 8ed43ade803..ca812ddcdf8 100644 --- a/doc/src/sgml/ref/refresh_materialized_view.sgml +++ b/doc/src/sgml/ref/refresh_materialized_view.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> REFRESH MATERIALIZED VIEW [ CONCURRENTLY ] <replaceable class="parameter">name</replaceable> - [ WITH [ NO ] DATA ] + [ WITH [ NO ] DATA ] [ WHERE <replaceable class="parameter">condition</replaceable> ] </synopsis> </refsynopsisdiv> @@ -44,6 +44,15 @@ REFRESH MATERIALIZED VIEW [ CONCURRENTLY ] <replaceable class="parameter">name</ <literal>CONCURRENTLY</literal> and <literal>WITH NO DATA</literal> may not be specified together. </para> + <para> + If a <literal>WHERE</literal> clause is specified, only the rows matching + the <replaceable class="parameter">condition</replaceable> are updated. + Rows in the materialized view that match the condition but are no longer + present in the underlying base tables (or no longer match the query definition) + are deleted. New rows from the base tables that match the condition are inserted. + Rows in the materialized view that do not match the condition are left unchanged. + The <literal>WHERE</literal> clause cannot be used with <literal>WITH NO DATA</literal>. + </para> </refsect1> <refsect1> @@ -87,6 +96,29 @@ REFRESH MATERIALIZED VIEW [ CONCURRENTLY ] <replaceable class="parameter">name</ </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>WHERE</literal> <replaceable class="parameter">condition</replaceable></term> + <listitem> + <para> + A <literal>WHERE</literal> clause specifying a condition that determines + which rows to refresh. The condition applies to both the existing data + in the materialized view and the new data generated by the view's defining query. + </para> + <para> + When a <literal>WHERE</literal> clause is used without + <literal>CONCURRENTLY</literal>, the operation requires a + <literal>ROW EXCLUSIVE</literal> lock, which allows concurrent reads on the + materialized view but blocks other modification commands. This is a lower + lock level than the <literal>ACCESS EXCLUSIVE</literal> lock required by a + full refresh. + </para> + <para> + The <literal>WHERE</literal> clause cannot contain volatile functions or + aggregates. The materialized view must be already populated to use this option. + </para> + </listitem> + </varlistentry> </variablelist> </refsect1> @@ -125,7 +157,17 @@ REFRESH MATERIALIZED VIEW order_summary; state: <programlisting> REFRESH MATERIALIZED VIEW annual_statistics_basis WITH NO DATA; +</programlisting> + </para> + + <para> + This command will update only the rows in the materialized view + <literal>order_summary</literal> where the <literal>order_date</literal> + is in the year 2024: +<programlisting> +REFRESH MATERIALIZED VIEW order_summary WHERE order_date >= '2024-01-01' AND order_date < '2025-01-01'; </programlisting></para> + </refsect1> <refsect1> diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 6dbb831ca89..5015562dae7 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -293,8 +293,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * reduces the chance that a subsequent refresh will fail. */ if (do_refresh) - RefreshMatViewByOid(address.objectId, true, false, false, - pstate->p_sourcetext, qc); + RefreshMatViewByOid(address.objectId, true, false, false, NULL, + pstate->p_sourcetext, params, qc); } else diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index f7d8007f796..1dc7358fa52 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -32,12 +32,20 @@ #include "executor/spi.h" #include "miscadmin.h" #include "pgstat.h" +#include "optimizer/optimizer.h" +#include "parser/parse_clause.h" +#include "parser/parse_coerce.h" +#include "parser/parse_expr.h" +#include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/hsearch.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" +#include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -53,6 +61,28 @@ typedef struct BulkInsertState bistate; /* bulk insert state */ } DR_transientrel; +/* + * Session-level cache for Partial Refresh plans. + * We cache the prepared SPI plans for both the row-locking and refresh steps + * avoiding expensive calls (pg_get_viewdef) and parsing on every execution. + */ +typedef struct MatViewPartialRefreshCache +{ + Oid matviewOid; /* Hash Key */ + + /* Validation fields */ + Oid uniqueIndexOid; /* The unique index used for conflict + * resolution */ + char *whereClauseStr; /* The WHERE clause string used to build the + * plans */ + + /* The cached plans */ + SPIPlanPtr lockPlan; /* SELECT FOR UPDATE */ + SPIPlanPtr refreshPlan; /* Fused CTE: Evaluate -> Upsert -> Delete */ +} MatViewPartialRefreshCache; + +static HTAB *MatViewRefreshCache = NULL; + static int matview_maintenance_depth = 0; static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); @@ -62,11 +92,19 @@ static void transientrel_destroy(DestReceiver *self); static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString, bool is_create); static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, - int save_sec_context); + int save_sec_context, char *whereClauseStr, + ParamListInfo params); +static uint64 refresh_by_direct_modification(Oid matviewOid, Oid relowner, + int save_sec_context, char *whereClauseStr, + ParamListInfo params); static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence); static bool is_usable_unique_index(Relation indexRel); static void OpenMatViewIncrementalMaintenance(void); static void CloseMatViewIncrementalMaintenance(void); +static int matview_execute_spi(const char *command, ParamListInfo params, bool read_only); +static int matview_execute_spi_plan(SPIPlanPtr plan, ParamListInfo params, bool read_only); +static char *get_matview_view_query(Oid matviewOid); +static void InitMatViewCache(void); /* * SetMatViewPopulatedState @@ -79,9 +117,17 @@ SetMatViewPopulatedState(Relation relation, bool newstate) { Relation pgrel; HeapTuple tuple; + Form_pg_class classForm; Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); + /* + * If the state matches, do nothing. This prevents cache invalidation + * storms when doing frequent partial refreshes via triggers. + */ + if (relation->rd_rel->relispopulated == newstate) + return; + /* * Update relation's pg_class entry. Crucial side-effect: other backends * (and this one too!) are sent SI message to make them rebuild relcache @@ -94,9 +140,13 @@ SetMatViewPopulatedState(Relation relation, bool newstate) elog(ERROR, "cache lookup failed for relation %u", RelationGetRelid(relation)); - ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate; + classForm = (Form_pg_class) GETSTRUCT(tuple); - CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); + if (classForm->relispopulated != newstate) + { + classForm->relispopulated = newstate; + CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); + } heap_freetuple(tuple); table_close(pgrel, RowExclusiveLock); @@ -108,23 +158,183 @@ SetMatViewPopulatedState(Relation relation, bool newstate) CommandCounterIncrement(); } +/* + * Hook to allow parameters (e.g. $1) in the WHERE clause. + */ +static Node * +refresh_paramref_hook(ParseState *pstate, ParamRef *pref) +{ + ParamListInfo params = (ParamListInfo) pstate->p_ref_hook_state; + Param *param; + + param = makeNode(Param); + param->paramkind = PARAM_EXTERN; + param->paramid = pref->number; + param->paramtype = UNKNOWNOID; + param->paramtypmod = -1; + param->paramcollid = InvalidOid; + param->location = pref->location; + + if (params && pref->number > 0 && pref->number <= params->numParams) + { + Oid ptype = params->params[pref->number - 1].ptype; + + if (OidIsValid(ptype)) + param->paramtype = ptype; + } + + return (Node *) param; +} + +/* + * Transform the WHERE clause for REFRESH MATERIALIZED VIEW. + */ +static Node * +transformRefreshWhereClause(Oid relid, Node *whereClause, ParamListInfo params) +{ + ParseState *pstate = make_parsestate(NULL); + Relation rel = table_open(relid, NoLock); + ParseNamespaceItem *nsitem; + Node *result; + + pstate->p_paramref_hook = refresh_paramref_hook; + pstate->p_ref_hook_state = (void *) params; + + nsitem = addRangeTableEntryForRelation(pstate, rel, AccessShareLock, NULL, false, true); + addNSItemToQuery(pstate, nsitem, false, true, true); + + result = transformExpr(pstate, whereClause, EXPR_KIND_WHERE); + result = coerce_to_boolean(pstate, result, "WHERE"); + + if (contain_volatile_functions(result)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("WHERE clause in REFRESH MATERIALIZED VIEW cannot contain volatile functions"))); + + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("WHERE clause in REFRESH MATERIALIZED VIEW cannot contain aggregates"))); + + table_close(rel, NoLock); + free_parsestate(pstate); + + return result; +} + +static char * +deparseRefreshWhereClause(Oid relid, Node *whereClause) +{ + return TextDatumGetCString(DirectFunctionCall2(pg_get_expr, + CStringGetTextDatum(nodeToString(whereClause)), + ObjectIdGetDatum(relid))); +} + +/* + * Helper to execute SPI commands with optional parameters. + */ +static int +matview_execute_spi(const char *command, ParamListInfo params, bool read_only) +{ + if (params && params->numParams > 0) + { + Oid *argtypes; + Datum *argvalues; + char *nulls; + int i; + int res; + + argtypes = (Oid *) palloc(params->numParams * sizeof(Oid)); + argvalues = (Datum *) palloc(params->numParams * sizeof(Datum)); + nulls = (char *) palloc(params->numParams * sizeof(char)); + + for (i = 0; i < params->numParams; i++) + { + ParamExternData *prm = ¶ms->params[i]; + + argtypes[i] = prm->ptype; + argvalues[i] = prm->value; + nulls[i] = prm->isnull ? 'n' : ' '; + } + + res = SPI_execute_with_args(command, params->numParams, argtypes, + argvalues, nulls, read_only, 0); + + pfree(argtypes); + pfree(argvalues); + pfree(nulls); + + return res; + } + else + { + return SPI_exec(command, 0); + } +} + +/* + * Helper to execute Prepared SPI Plans with optional parameters. + */ +static int +matview_execute_spi_plan(SPIPlanPtr plan, ParamListInfo params, bool read_only) +{ + if (params && params->numParams > 0) + { + Datum *argvalues; + char *nulls; + int i; + int res; + + argvalues = (Datum *) palloc(params->numParams * sizeof(Datum)); + nulls = (char *) palloc(params->numParams * sizeof(char)); + + for (i = 0; i < params->numParams; i++) + { + ParamExternData *prm = ¶ms->params[i]; + + argvalues[i] = prm->value; + nulls[i] = prm->isnull ? 'n' : ' '; + } + + res = SPI_execute_plan(plan, argvalues, nulls, read_only, 0); + + pfree(argvalues); + pfree(nulls); + + return res; + } + else + { + return SPI_execute_plan(plan, NULL, NULL, read_only, 0); + } +} + /* * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command * - * If WITH NO DATA was specified, this is effectively like a TRUNCATE; - * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT - * statement associated with the materialized view. The statement node's - * skipData field shows whether the clause was used. + * This is the entry point for REFRESH MATERIALIZED VIEW. It handles: + * + * - WITH NO DATA: effectively like a TRUNCATE. + * - CONCURRENTLY: diff-based refresh allowing concurrent reads. + * - WHERE clause: partial refresh of a subset of rows. + * - Default: full rebuild via heap swap. + * + * The statement node's skipData field shows whether WITH NO DATA was used. */ ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, - QueryCompletion *qc) + ParamListInfo params, QueryCompletion *qc) { Oid matviewOid; LOCKMODE lockmode; /* Determine strength of lock needed. */ - lockmode = stmt->concurrent ? ExclusiveLock : AccessExclusiveLock; + if (stmt->concurrent) + lockmode = ExclusiveLock; + else if (stmt->whereClause) + lockmode = RowExclusiveLock; + else + lockmode = AccessExclusiveLock; /* * Get a lock until end of transaction. @@ -135,24 +345,40 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, NULL); return RefreshMatViewByOid(matviewOid, false, stmt->skipData, - stmt->concurrent, queryString, qc); + stmt->concurrent, stmt->whereClause, + queryString, params, qc); } /* * RefreshMatViewByOid -- refresh materialized view by OID * - * This refreshes the materialized view by creating a new table and swapping - * the relfilenumbers of the new table and the old materialized view, so the OID - * of the original materialized view is preserved. Thus we do not lose GRANT - * nor references to this materialized view. + * This refreshes a materialized view using one of three strategies: + * + * 1. Partial non-concurrent (WHERE clause, no CONCURRENTLY): + * Directly modifies the matview in-place using a two-step approach + * (SELECT FOR UPDATE followed by a CTE upsert/delete). + * Uses RowExclusiveLock, allowing concurrent reads and concurrent writes + * to non-overlapping rows. Overlapping writes are serialized by row locks. + * + * 2. Concurrent (CONCURRENTLY, with or without WHERE clause): + * Creates a temporary table with new data, computes a diff against + * the existing matview, and applies changes. Uses ExclusiveLock, + * allowing concurrent reads throughout the operation but blocking all + * concurrent writes. + * + * 3. Full rebuild (default, no WHERE, no CONCURRENTLY): + * Creates a new heap, populates it, and swaps relfilenumbers. + * Uses AccessExclusiveLock, blocking all concurrent access. + * The OID of the original materialized view is preserved, so we + * do not lose GRANT nor references to this materialized view. * * If skipData is true, this is effectively like a TRUNCATE; otherwise it is * like a TRUNCATE followed by an INSERT using the SELECT statement associated * with the materialized view. * - * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading - * the new heap, it's better to create the indexes afterwards than to fill them - * incrementally while we load. + * For full rebuild, indexes are rebuilt too, via REINDEX. Since we are + * effectively bulk-loading the new heap, it's better to create the indexes + * afterwards than to fill them incrementally while we load. * * The matview's "populated" state is changed based on whether the contents * reflect the result set of the materialized view's query. @@ -162,22 +388,22 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ ObjectAddress RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData, - bool concurrent, const char *queryString, + bool concurrent, Node *whereClause, + const char *queryString, ParamListInfo params, QueryCompletion *qc) { Relation matviewRel; RewriteRule *rule; List *actions; Query *dataQuery; - Oid tableSpace; Oid relowner; - Oid OIDNewHeap; uint64 processed = 0; - char relpersistence; Oid save_userid; int save_sec_context; int save_nestlevel; ObjectAddress address; + Node *qual = NULL; + char *qual_str = NULL; matviewRel = table_open(matviewOid, NoLock); relowner = matviewRel->rd_rel->relowner; @@ -206,7 +432,11 @@ RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CONCURRENTLY cannot be used when the materialized view is not populated"))); - /* Check that conflicting options have not been specified. */ + if (whereClause && !RelationIsPopulated(matviewRel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("WHERE clause cannot be used when the materialized view is not populated"))); + if (concurrent && skipData) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -240,6 +470,12 @@ RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData, "the rule for materialized view \"%s\" is not a single action", RelationGetRelationName(matviewRel)); + if (whereClause) + { + qual = transformRefreshWhereClause(matviewOid, whereClause, params); + qual_str = deparseRefreshWhereClause(matviewOid, qual); + } + /* * Check that there is a unique index with no WHERE clause on one or more * columns of the materialized view if CONCURRENTLY is specified. @@ -298,47 +534,83 @@ RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData, */ SetMatViewPopulatedState(matviewRel, !skipData); - /* Concurrent refresh builds new data in temp tablespace, and does diff. */ - if (concurrent) - { - tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false); - relpersistence = RELPERSISTENCE_TEMP; - } - else + /* + * STRATEGY 1: PARTIAL NON-CONCURRENT + */ + if (qual && !concurrent && !skipData) { - tableSpace = matviewRel->rd_rel->reltablespace; - relpersistence = matviewRel->rd_rel->relpersistence; + processed = refresh_by_direct_modification(matviewOid, relowner, + save_sec_context, qual_str, + params); } /* - * Create the transient table that will receive the regenerated data. Lock - * it against access by any other process until commit (by which time it - * will be gone). + * STRATEGY 2: CONCURRENT (PARTIAL or FULL) */ - OIDNewHeap = make_new_heap(matviewOid, tableSpace, - matviewRel->rd_rel->relam, - relpersistence, ExclusiveLock); - Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false)); - - /* Generate the data, if wanted. */ - if (!skipData) + else if (concurrent) { - DestReceiver *dest; + Oid tableSpace; + char relpersistence; + Oid OIDNewHeap; + int old_depth = matview_maintenance_depth; - dest = CreateTransientRelDestReceiver(OIDNewHeap); - processed = refresh_matview_datafill(dest, dataQuery, queryString, - is_create); - } + tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false); + relpersistence = RELPERSISTENCE_TEMP; - /* Make the matview match the newly generated data. */ - if (concurrent) - { - int old_depth = matview_maintenance_depth; + /* + * Create the transient table that will receive the regenerated data. + * Lock it against access by any other process until commit (by which + * time it will be gone). + */ + OIDNewHeap = make_new_heap(matviewOid, tableSpace, + matviewRel->rd_rel->relam, + relpersistence, ExclusiveLock); + Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false)); + + /* Generate the data, if wanted. */ + if (!skipData) + { + if (qual_str) + { + StringInfoData buf; + char *view_sql = get_matview_view_query(matviewOid); + char *transient_name; + Relation transientRel = table_open(OIDNewHeap, NoLock); + + transient_name = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(transientRel)), + RelationGetRelationName(transientRel)); + table_close(transientRel, NoLock); + + /* + * Init buffer before SPI connection to avoid double free + * issues on context destroy + */ + initStringInfo(&buf); + appendStringInfo(&buf, "INSERT INTO %s SELECT * FROM (%s) _mv_q WHERE %s", + transient_name, view_sql, qual_str); + + SPI_connect(); + if (matview_execute_spi(buf.data, params, false) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", buf.data); + processed = SPI_processed; + SPI_finish(); + pfree(view_sql); + pfree(transient_name); + pfree(buf.data); + } + else + { + DestReceiver *dest; + + dest = CreateTransientRelDestReceiver(OIDNewHeap); + processed = refresh_matview_datafill(dest, dataQuery, queryString, is_create); + } + } PG_TRY(); { refresh_by_match_merge(matviewOid, OIDNewHeap, relowner, - save_sec_context); + save_sec_context, qual_str, params); } PG_CATCH(); { @@ -346,10 +618,34 @@ RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData, PG_RE_THROW(); } PG_END_TRY(); + Assert(matview_maintenance_depth == old_depth); } + + /* + * STRATEGY 3: FULL REBUILD + */ else { + Oid tableSpace; + char relpersistence; + Oid OIDNewHeap; + + tableSpace = matviewRel->rd_rel->reltablespace; + relpersistence = matviewRel->rd_rel->relpersistence; + + OIDNewHeap = make_new_heap(matviewOid, tableSpace, + matviewRel->rd_rel->relam, + relpersistence, AccessExclusiveLock); + + if (!skipData) + { + DestReceiver *dest; + + dest = CreateTransientRelDestReceiver(OIDNewHeap); + processed = refresh_matview_datafill(dest, dataQuery, queryString, is_create); + } + refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence); /* @@ -555,6 +851,355 @@ transientrel_destroy(DestReceiver *self) pfree(self); } +/* + * get_matview_view_query + * + * Retrieve the SQL definition of a materialized view's underlying query. + * Returns the query text with trailing semicolons and whitespace removed. + */ +static char * +get_matview_view_query(Oid matviewOid) +{ + char *view_sql; + + view_sql = TextDatumGetCString(DirectFunctionCall2(pg_get_viewdef, + ObjectIdGetDatum(matviewOid), + BoolGetDatum(false))); + if (view_sql) + { + int len = strlen(view_sql); + + while (len > 0 && (view_sql[len - 1] == ';' || isspace((unsigned char) view_sql[len - 1]))) + view_sql[--len] = '\0'; + } + return view_sql; +} + +static void +InitMatViewCache(void) +{ + HASHCTL ctl; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(MatViewPartialRefreshCache); + ctl.hcxt = CacheMemoryContext; + + MatViewRefreshCache = hash_create("MatView Partial Refresh Cache", + 16, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* + * refresh_by_direct_modification + * + * This modifies the materialized view in-place without creating a temporary + * heap or swapping relfilenumbers. It requires a usable unique index on the + * matview for conflict resolution. + * + * Concurrency is handled in two steps, each executed as a separate SPI + * statement: + * + * 1. Lock existing rows matching the WHERE clause via SELECT FOR UPDATE. + * This serializes concurrent partial refreshes that touch overlapping + * rows while allowing non-overlapping refreshes to proceed in parallel. + * + * 2. Execute a single CTE that evaluates the underlying query, upserts + * the results into the matview, and deletes rows that no longer match + * the predicate (via anti-join against the fresh query output). + * + * To avoid rebuilding the SQL and re-preparing the SPI plans on every call, + * we cache both plans in a session-level hash table keyed by matview OID. + * + * Returns the number of rows processed by the refresh CTE. + */ +static uint64 +refresh_by_direct_modification(Oid matviewOid, Oid relowner, + int save_sec_context, char *whereClauseStr, + ParamListInfo params) +{ + Relation matviewRel; + Oid uniqueIndexOid = InvalidOid; + List *indexoidlist; + ListCell *lc; + MatViewPartialRefreshCache *cacheEntry; + bool found; + uint64 result_processed = 0; + + matviewRel = table_open(matviewOid, NoLock); + + /* Find a usable unique index, preferring the primary key. */ + indexoidlist = RelationGetIndexList(matviewRel); + foreach(lc, indexoidlist) + { + Oid indexoid = lfirst_oid(lc); + Relation indexRel; + bool usable; + bool is_pk; + + indexRel = index_open(indexoid, AccessShareLock); + usable = is_usable_unique_index(indexRel); + is_pk = indexRel->rd_index->indisprimary; + index_close(indexRel, AccessShareLock); + + if (usable) + { + if (is_pk) + { + uniqueIndexOid = indexoid; + break; + } + if (!OidIsValid(uniqueIndexOid)) + uniqueIndexOid = indexoid; + } + } + list_free(indexoidlist); + + if (!OidIsValid(uniqueIndexOid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot perform partial refresh on materialized view \"%s\"", + RelationGetRelationName(matviewRel)), + errdetail("Partial refresh requires a usable unique index to perform an UPSERT operation."))); + + /* Look up or create a plan cache entry for this matview. */ + if (!MatViewRefreshCache) + InitMatViewCache(); + + cacheEntry = (MatViewPartialRefreshCache *) hash_search(MatViewRefreshCache, + &matviewOid, + HASH_ENTER, + &found); + + /* + * We have a cache hit ONLY if the entry exists, the unique index matches, + * and the WHERE clause string perfectly matches. We also ensure + * whereClauseStr is not NULL to prevent a strcmp segfault if a previous + * compilation failed midway. + */ + if (found && + cacheEntry->uniqueIndexOid == uniqueIndexOid && + cacheEntry->whereClauseStr != NULL && + whereClauseStr != NULL && + strcmp(cacheEntry->whereClauseStr, whereClauseStr) == 0) + { + /* Cache is valid. Do nothing. */ + } + else + { + if (found) + { + /* Index or WHERE clause changed; discard stale plans. */ + if (cacheEntry->lockPlan) + SPI_freeplan(cacheEntry->lockPlan); + if (cacheEntry->refreshPlan) + SPI_freeplan(cacheEntry->refreshPlan); + if (cacheEntry->whereClauseStr) + pfree(cacheEntry->whereClauseStr); + } + + cacheEntry->lockPlan = NULL; + cacheEntry->refreshPlan = NULL; + cacheEntry->whereClauseStr = NULL; + } + + OpenMatViewIncrementalMaintenance(); + + SPI_connect(); + + /* Prepare plans if we don't have valid cached ones. */ + if (cacheEntry->lockPlan == NULL || cacheEntry->refreshPlan == NULL) + { + StringInfoData buf; + char *view_sql; + char *matview_name; + const char *matview_alias; + Oid *argtypes = NULL; + int nargs = 0; + Relation indexRel; + Form_pg_index indexStruct; + TupleDesc tupdesc = matviewRel->rd_att; + StringInfoData conflict_cols; + StringInfoData set_clause; + StringInfoData join_clause; + bool first; + bool has_non_key_cols = false; + int i; + MemoryContext oldcxt; + + matview_name = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel)); + matview_alias = quote_identifier(RelationGetRelationName(matviewRel)); + view_sql = get_matview_view_query(matviewOid); + + if (params && params->numParams > 0) + { + nargs = params->numParams; + argtypes = (Oid *) palloc(nargs * sizeof(Oid)); + for (i = 0; i < nargs; i++) + argtypes[i] = params->params[i].ptype; + } + + /* + * Prepare the row-locking statement. This acquires FOR UPDATE locks + * on matview rows matching the WHERE clause to serialize concurrent + * partial refreshes on overlapping rows. + */ + initStringInfo(&buf); + appendStringInfo(&buf, "SELECT 1 FROM %s mv WHERE (%s) FOR UPDATE", + matview_name, whereClauseStr); + + cacheEntry->lockPlan = SPI_prepare(buf.data, nargs, argtypes); + if (cacheEntry->lockPlan == NULL) + elog(ERROR, "SPI_prepare failed for lock acquisition: %s", buf.data); + SPI_keepplan(cacheEntry->lockPlan); + + /* + * Build the refresh CTE: evaluate the underlying query with the WHERE + * predicate, upsert the results, and delete matview rows that no + * longer appear in the query output. + */ + indexRel = index_open(uniqueIndexOid, AccessShareLock); + indexStruct = indexRel->rd_index; + + initStringInfo(&conflict_cols); + initStringInfo(&set_clause); + initStringInfo(&join_clause); + + /* Build the ON CONFLICT column list and anti-join condition. */ + first = true; + for (i = 0; i < indexStruct->indnkeyatts; i++) + { + int attnum = indexStruct->indkey.values[i]; + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + const char *quoted; + + quoted = quote_identifier(NameStr(attr->attname)); + + if (!first) + { + appendStringInfoString(&conflict_cols, ", "); + appendStringInfoString(&join_clause, " AND "); + } + first = false; + + appendStringInfoString(&conflict_cols, quoted); + appendStringInfo(&join_clause, + "nd.%s IS NOT DISTINCT FROM mv.%s", + quoted, quoted); + } + + /* Build the DO UPDATE SET clause for non-key columns. */ + first = true; + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + const char *quoted; + bool is_key = false; + int j; + + if (attr->attisdropped) + continue; + + for (j = 0; j < indexStruct->indnkeyatts; j++) + { + if (indexStruct->indkey.values[j] == (i + 1)) + { + is_key = true; + break; + } + } + + if (is_key) + continue; + + if (!first) + appendStringInfoString(&set_clause, ", "); + first = false; + has_non_key_cols = true; + + quoted = quote_identifier(NameStr(attr->attname)); + appendStringInfo(&set_clause, "%s = EXCLUDED.%s", quoted, quoted); + } + + index_close(indexRel, NoLock); + + resetStringInfo(&buf); + + if (has_non_key_cols) + { + appendStringInfo(&buf, + "WITH new_data AS MATERIALIZED ( " + " SELECT * FROM (%s) %s WHERE (%s) " + "), " + "upsert AS ( " + " INSERT INTO %s SELECT * FROM new_data " + " ON CONFLICT (%s) DO UPDATE SET %s " + ") " + "DELETE FROM %s mv WHERE (%s) AND NOT EXISTS ( " + " SELECT 1 FROM new_data nd WHERE %s" + ")", + view_sql, matview_alias, whereClauseStr, + matview_name, conflict_cols.data, set_clause.data, + matview_name, whereClauseStr, join_clause.data); + } + else + { + appendStringInfo(&buf, + "WITH new_data AS MATERIALIZED ( " + " SELECT * FROM (%s) %s WHERE (%s) " + "), " + "upsert AS ( " + " INSERT INTO %s SELECT * FROM new_data " + " ON CONFLICT (%s) DO NOTHING " + ") " + "DELETE FROM %s mv WHERE (%s) AND NOT EXISTS ( " + " SELECT 1 FROM new_data nd WHERE %s" + ")", + view_sql, matview_alias, whereClauseStr, + matview_name, conflict_cols.data, + matview_name, whereClauseStr, join_clause.data); + } + + cacheEntry->refreshPlan = SPI_prepare(buf.data, nargs, argtypes); + if (cacheEntry->refreshPlan == NULL) + elog(ERROR, "SPI_prepare failed for refresh CTE: %s", buf.data); + SPI_keepplan(cacheEntry->refreshPlan); + + /* Save cache metadata in a long-lived context. */ + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + cacheEntry->uniqueIndexOid = uniqueIndexOid; + cacheEntry->whereClauseStr = pstrdup(whereClauseStr); + MemoryContextSwitchTo(oldcxt); + + pfree(matview_name); + pfree(view_sql); + pfree(buf.data); + pfree(conflict_cols.data); + pfree(set_clause.data); + pfree(join_clause.data); + if (argtypes != NULL) + pfree(argtypes); + } + + + /* Execute: lock matching rows, then run the refresh CTE. */ + if (matview_execute_spi_plan(cacheEntry->lockPlan, params, false) < 0) + elog(ERROR, "SPI_execute_plan failed during lock acquisition"); + + if (matview_execute_spi_plan(cacheEntry->refreshPlan, params, false) < 0) + elog(ERROR, "SPI_execute_plan failed during refresh"); + + result_processed = SPI_processed; + + SPI_finish(); + CloseMatViewIncrementalMaintenance(); + table_close(matviewRel, NoLock); + + return result_processed; +} + /* * refresh_by_match_merge * @@ -572,6 +1217,10 @@ transientrel_destroy(DestReceiver *self) * are consistent with default behavior. If there is at least one UNIQUE * index on the materialized view, we have exactly the guarantee we need. * + * If whereClauseStr is provided, only rows matching the WHERE condition + * in the existing matview are considered for the diff operation, enabling + * partial concurrent refresh. + * * The temporary table used to hold the diff results contains just the TID of * the old record (if matched) and the ROW from the new table as a single * column of complex record type (if matched). @@ -589,7 +1238,8 @@ transientrel_destroy(DestReceiver *self) */ static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, - int save_sec_context) + int save_sec_context, char *whereClauseStr, + ParamListInfo params) { StringInfoData querybuf; Relation matviewRel; @@ -703,8 +1353,15 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, appendStringInfo(&querybuf, "INSERT INTO %s " "SELECT mv.ctid AS tid, newdata.*::%s AS newdata " - "FROM %s mv FULL JOIN %s newdata ON (", - diffname, tempname, matviewname, tempname); + "FROM ", + diffname, tempname); + + if (whereClauseStr) + appendStringInfo(&querybuf, "(SELECT ctid, * FROM %s WHERE %s) mv", matviewname, whereClauseStr); + else + appendStringInfo(&querybuf, "%s mv", matviewname); + + appendStringInfo(&querybuf, " FULL JOIN %s newdata ON (", tempname); /* * Get the list of index OIDs for the table from the relcache, and look up @@ -826,13 +1483,42 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, errmsg("could not find suitable unique index on materialized view \"%s\"", RelationGetRelationName(matviewRel))); - appendStringInfoString(&querybuf, - " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) " - "WHERE newdata.* IS NULL OR mv.* IS NULL " - "ORDER BY tid"); + if (whereClauseStr) + { + StringInfoData cols; + int i; + bool first = true; + + initStringInfo(&cols); + for (i = 0; i < relnatts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + + if (attr->attisdropped) + continue; + if (!first) + appendStringInfoString(&cols, ", "); + first = false; + appendStringInfo(&cols, "mv.%s", quote_qualified_identifier(NULL, NameStr(attr->attname))); + } + + appendStringInfo(&querybuf, + " AND newdata.* OPERATOR(pg_catalog.*=) ROW(%s)) " + "WHERE newdata.* IS NULL OR mv.ctid IS NULL " + "ORDER BY tid", + cols.data); + pfree(cols.data); + } + else + { + appendStringInfoString(&querybuf, + " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) " + "WHERE newdata.* IS NULL OR mv.* IS NULL " + "ORDER BY tid"); + } /* Populate the temporary "diff" table. */ - if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + if (matview_execute_spi(querybuf.data, params, false) != SPI_OK_INSERT) elog(ERROR, "SPI_exec failed: %s", querybuf.data); /* @@ -898,7 +1584,8 @@ refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) } /* - * Check whether specified index is usable for match merge. + * Check whether the specified index is usable for refresh_by_match_merge + * or refresh_by_direct_modification. */ static bool is_usable_unique_index(Relation indexRel) @@ -946,8 +1633,10 @@ is_usable_unique_index(Relation indexRel) * * While the function names reflect the fact that their main intended use is * incremental maintenance of materialized views (in response to changes to - * the data in referenced relations), they are initially used to allow REFRESH - * without blocking concurrent reads. + * the data in referenced relations), they are currently used to allow: + * + * - REFRESH CONCURRENTLY without blocking concurrent reads. + * - REFRESH ... WHERE ... which modifies the matview in-place. */ bool MatViewIncrementalMaintenanceIsEnabled(void) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 4b30f768680..3380f5e68e1 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1229,7 +1229,8 @@ CheckValidRowMarkRel(Relation rel, RowMarkType markType) break; case RELKIND_MATVIEW: /* Allow referencing a matview, but not actual locking clauses */ - if (markType != ROW_MARK_REFERENCE) + if (markType != ROW_MARK_REFERENCE && + !MatViewIncrementalMaintenanceIsEnabled()) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot lock rows in materialized view \"%s\"", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2d1b19d1f53..b587967ec15 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -326,6 +326,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <str> opt_single_name %type <list> opt_qualified_name %type <boolean> opt_concurrently opt_usingindex +%type <node> opt_refresh_where_clause %type <dbehavior> opt_drop_behavior %type <list> opt_utility_option_list %type <list> opt_wait_with_clause @@ -5097,17 +5098,28 @@ OptNoLog: UNLOGGED { $$ = RELPERSISTENCE_UNLOGGED; } *****************************************************************************/ RefreshMatViewStmt: - REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data + REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data opt_refresh_where_clause { RefreshMatViewStmt *n = makeNode(RefreshMatViewStmt); n->concurrent = $4; n->relation = $5; n->skipData = !($6); + n->whereClause = $7; + + if (n->skipData && n->whereClause) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify WHERE clause with WITH NO DATA"))); $$ = (Node *) n; } ; +opt_refresh_where_clause: + WHERE a_expr { $$ = $2; } + | /* empty */ { $$ = NULL; } + ; + /***************************************************************************** * diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 1d34c19913e..56ae3bd2f94 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1694,7 +1694,7 @@ ProcessUtilitySlow(ParseState *pstate, PG_TRY(2); { address = ExecRefreshMatView((RefreshMatViewStmt *) parsetree, - queryString, qc); + queryString, params, qc); } PG_FINALLY(2); { diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h index 738c731c1a9..ff46542d66c 100644 --- a/src/include/commands/matview.h +++ b/src/include/commands/matview.h @@ -24,9 +24,11 @@ extern void SetMatViewPopulatedState(Relation relation, bool newstate); extern ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, - QueryCompletion *qc); + ParamListInfo params, QueryCompletion *qc); + extern ObjectAddress RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData, - bool concurrent, const char *queryString, + bool concurrent, Node *whereClause, + const char *queryString, ParamListInfo params, QueryCompletion *qc); extern DestReceiver *CreateTransientRelDestReceiver(Oid transientoid); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 91377a6cde3..3fc088fd262 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4159,6 +4159,7 @@ typedef struct RefreshMatViewStmt bool concurrent; /* allow concurrent access? */ bool skipData; /* true for WITH NO DATA */ RangeVar *relation; /* relation to insert into */ + Node *whereClause; /* qualification for partial refresh */ } RefreshMatViewStmt; /* ---------------------- diff --git a/src/test/regress/expected/matview_where.out b/src/test/regress/expected/matview_where.out new file mode 100644 index 00000000000..75b02e84a6a --- /dev/null +++ b/src/test/regress/expected/matview_where.out @@ -0,0 +1,344 @@ +-- +-- REFRESH MATERIALIZED VIEW ... WHERE ... +-- +-- Setup +CREATE TABLE mv_base_a (id int primary key, val text); +INSERT INTO mv_base_a VALUES (1, 'One'), (2, 'Two'), (3, 'Three'); +CREATE MATERIALIZED VIEW mv_test_a AS SELECT * FROM mv_base_a; +CREATE UNIQUE INDEX ON mv_test_a(id); +-- +-- Test 1: Syntax and Error handling +-- +-- 1.1 WITH NO DATA + WHERE -> Error +REFRESH MATERIALIZED VIEW mv_test_a WITH NO DATA WHERE id = 1; +ERROR: cannot specify WHERE clause with WITH NO DATA +-- 1.2 Unpopulated + WHERE -> Error +CREATE MATERIALIZED VIEW mv_unpop AS SELECT * FROM mv_base_a WITH NO DATA; +REFRESH MATERIALIZED VIEW mv_unpop WHERE id = 1; +ERROR: WHERE clause cannot be used when the materialized view is not populated +DROP MATERIALIZED VIEW mv_unpop; +-- 1.3 Volatile functions -> Error +REFRESH MATERIALIZED VIEW mv_test_a WHERE random() > 0.5; +ERROR: WHERE clause in REFRESH MATERIALIZED VIEW cannot contain volatile functions +-- 1.4 Aggregates -> Error +REFRESH MATERIALIZED VIEW mv_test_a WHERE count(*) > 0; +ERROR: aggregate functions are not allowed in WHERE +-- +-- Test 2: Non-concurrent Partial Refresh +-- +-- Modify base data +UPDATE mv_base_a SET val = 'One Updated' WHERE id = 1; +UPDATE mv_base_a SET val = 'Two Updated' WHERE id = 2; +-- Refresh only id=1 +REFRESH MATERIALIZED VIEW mv_test_a WHERE id = 1; +-- Verify: id=1 should be updated, id=2 should remain stale +SELECT * FROM mv_test_a ORDER BY id; + id | val +----+------------- + 1 | One Updated + 2 | Two + 3 | Three +(3 rows) + +-- Refresh id=2 +REFRESH MATERIALIZED VIEW mv_test_a WHERE id = 2; +SELECT * FROM mv_test_a ORDER BY id; + id | val +----+------------- + 1 | One Updated + 2 | Two Updated + 3 | Three +(3 rows) + +-- +-- Test 3: Concurrent Partial Refresh +-- +-- Modify base data +UPDATE mv_base_a SET val = 'One Concurrent' WHERE id = 1; +UPDATE mv_base_a SET val = 'Two Concurrent' WHERE id = 2; +-- Refresh only id=1 +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_test_a WHERE id = 1; +-- Verify: id=1 updated, id=2 stale +SELECT * FROM mv_test_a ORDER BY id; + id | val +----+---------------- + 1 | One Concurrent + 2 | Two Updated + 3 | Three +(3 rows) + +-- Refresh id=2 +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_test_a WHERE id = 2; +SELECT * FROM mv_test_a ORDER BY id; + id | val +----+---------------- + 1 | One Concurrent + 2 | Two Concurrent + 3 | Three +(3 rows) + +-- Cleanup Test 2/3 +DROP MATERIALIZED VIEW mv_test_a; +DROP TABLE mv_base_a; +-- +-- Test 4: Join View (Invoice style) +-- +CREATE TABLE invoices (id int primary key, total numeric); +CREATE TABLE invoice_items (inv_id int references invoices(id), amount numeric); +INSERT INTO invoices VALUES (1, 0), (2, 0); +INSERT INTO invoice_items VALUES (1, 100), (1, 50), (2, 200); +CREATE MATERIALIZED VIEW mv_invoices AS + SELECT i.id, sum(ii.amount) as computed_total + FROM invoices i + JOIN invoice_items ii ON i.id = ii.inv_id + GROUP BY i.id; +CREATE UNIQUE INDEX ON mv_invoices(id); +SELECT * FROM mv_invoices ORDER BY id; + id | computed_total +----+---------------- + 1 | 150 + 2 | 200 +(2 rows) + +-- Modify items for invoice 1 +INSERT INTO invoice_items VALUES (1, 25); +-- Modify items for invoice 2 +INSERT INTO invoice_items VALUES (2, 50); +-- Refresh only invoice 1 +REFRESH MATERIALIZED VIEW mv_invoices WHERE id = 1; +-- Verify: Invoice 1 updated (175), Invoice 2 stale (200) +SELECT * FROM mv_invoices ORDER BY id; + id | computed_total +----+---------------- + 1 | 175 + 2 | 200 +(2 rows) + +-- Refresh invoice 2 concurrently +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_invoices WHERE id = 2; +-- Verify: Invoice 2 updated (250) +SELECT * FROM mv_invoices ORDER BY id; + id | computed_total +----+---------------- + 1 | 175 + 2 | 250 +(2 rows) + +DROP MATERIALIZED VIEW mv_invoices; +DROP TABLE invoice_items; +DROP TABLE invoices; +-- +-- Test 5: Rows entering/leaving view scope +-- +CREATE TABLE items (id int, status text, val int); +INSERT INTO items VALUES (1, 'active', 10), (2, 'inactive', 20); +CREATE MATERIALIZED VIEW mv_active_items AS + SELECT * FROM items WHERE status = 'active'; +CREATE UNIQUE INDEX ON mv_active_items(id); +SELECT * FROM mv_active_items ORDER BY id; + id | status | val +----+--------+----- + 1 | active | 10 +(1 row) + +-- Case A: Row changes status active -> inactive (should be removed) +UPDATE items SET status = 'inactive' WHERE id = 1; +-- Also update row 2 to active (should be added) +UPDATE items SET status = 'active' WHERE id = 2; +-- Refresh partial WHERE id=1 +-- Should remove id=1 because it no longer matches view definition +REFRESH MATERIALIZED VIEW mv_active_items WHERE id = 1; +SELECT * FROM mv_active_items ORDER BY id; + id | status | val +----+--------+----- +(0 rows) + +-- Case B: Refresh to add row 2 (which is now active) +REFRESH MATERIALIZED VIEW mv_active_items WHERE id = 2; +SELECT * FROM mv_active_items ORDER BY id; + id | status | val +----+--------+----- + 2 | active | 20 +(1 row) + +-- Cleanup +DROP MATERIALIZED VIEW mv_active_items; +DROP TABLE items; +-- +-- Test 6: Order of Operations (Value Swap) +-- Addressed specific worry: "The order of tuple processing matters" +-- +CREATE TABLE mv_swap_base (id int primary key, code text); +INSERT INTO mv_swap_base VALUES (1, 'A'), (2, 'B'); +CREATE MATERIALIZED VIEW mv_swap AS SELECT * FROM mv_swap_base; +CREATE UNIQUE INDEX ON mv_swap(code); -- Unique Index is on code, not ID +SELECT * FROM mv_swap ORDER BY id; + id | code +----+------ + 1 | A + 2 | B +(2 rows) + +-- Perform a swap in the base table +-- 1 becomes B, 2 becomes A +BEGIN; +UPDATE mv_swap_base SET code = 'TEMP' WHERE id = 1; +UPDATE mv_swap_base SET code = 'A' WHERE id = 2; +UPDATE mv_swap_base SET code = 'B' WHERE id = 1; +COMMIT; +-- Refresh both rows concurrently. +-- If the implementation inserts (1, 'B') before deleting (2, 'B'), this will fail. +-- It relies on the implementation correctly handling the Delete/Lock set before the Insert/Upsert. +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_swap WHERE id IN (1, 2); +SELECT * FROM mv_swap ORDER BY id; + id | code +----+------ + 1 | B + 2 | A +(2 rows) + +DROP MATERIALIZED VIEW mv_swap; +DROP TABLE mv_swap_base; +-- +-- Test 7: Scope Drift / Constraint Violation +-- Addressed specific worry: "If WHERE predicate would be different... UK violation couldn't be solved" +-- +CREATE TABLE mv_drift_base (id int primary key, category_id int); +INSERT INTO mv_drift_base VALUES (1, 100), (2, 200); +CREATE MATERIALIZED VIEW mv_drift AS SELECT * FROM mv_drift_base; +-- KEY FIX: Index on ID, not Category. +-- We want to test that the Refresh logic detects ID conflicts when rows drift into scope, +-- not that Postgres enforces unique indexes on non-unique data. +CREATE UNIQUE INDEX ON mv_drift(id); +-- Update Row 1 to collide with Row 2's category +UPDATE mv_drift_base SET category_id = 200 WHERE id = 1; +-- Attempt to refresh using the NEW category value as the filter. +-- The View still contains (1, 100). +-- The Filter "category_id = 200" sees the NEW row (1, 200) in the base table. +-- The Filter "category_id = 200" does NOT see the OLD row (1, 100) in the View. +-- Result: The system thinks (1, 200) is a brand new row and tries to INSERT it. +-- This MUST fail with a Unique Constraint Violation on 'id' because (1, 100) was never deleted. +\set VERBOSITY terse +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_drift WHERE category_id = 200; +ERROR: duplicate key value violates unique constraint "mv_drift_id_idx" +\set VERBOSITY default +-- Correct usage: Scope must include BOTH the Old location (100) and New location (200) +-- so the system sees the update as an update (or delete+insert). +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_drift WHERE category_id IN (100, 200); +SELECT * FROM mv_drift ORDER BY id; + id | category_id +----+------------- + 1 | 200 + 2 | 200 +(2 rows) + +DROP MATERIALIZED VIEW mv_drift; +DROP TABLE mv_drift_base; +-- +-- Test 8: Multiple Unique Keys +-- Addressed specific worry: "what if we have multiple UKs?" +-- +CREATE TABLE mv_multi_uk (id int primary key, email text, username text); +INSERT INTO mv_multi_uk VALUES (1, '[email protected]', 'user_a'); +CREATE MATERIALIZED VIEW mv_multi AS SELECT * FROM mv_multi_uk; +CREATE UNIQUE INDEX ON mv_multi(email); +CREATE UNIQUE INDEX ON mv_multi(username); +-- Update all columns +UPDATE mv_multi_uk SET email = '[email protected]', username = 'user_b' WHERE id = 1; +-- Refresh should succeed updating all unique indexes +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_multi WHERE id = 1; +SELECT * FROM mv_multi; + id | email | username +----+---------------+---------- + 1 | [email protected] | user_b +(1 row) + +DROP MATERIALIZED VIEW mv_multi; +DROP TABLE mv_multi_uk; +-- +-- Test 9: Trigger-based Automatic Maintenance +-- Use Case: Automating the partial refresh via triggers using Arrays. +-- +CREATE TABLE mv_trigger_base (id int primary key, val text); +CREATE MATERIALIZED VIEW mv_trigger_view AS SELECT * FROM mv_trigger_base; +CREATE UNIQUE INDEX ON mv_trigger_view(id); +-- Create a maintainer function +CREATE OR REPLACE FUNCTION maintain_mv_trigger_view() RETURNS TRIGGER AS $$ +BEGIN + IF (TG_OP IN ('INSERT', 'UPDATE')) THEN + EXECUTE 'REFRESH MATERIALIZED VIEW mv_trigger_view WHERE id = ANY($1);' + USING (SELECT array_agg(id) FROM new_table); + END IF; + + IF (TG_OP IN ('DELETE')) THEN + EXECUTE 'REFRESH MATERIALIZED VIEW mv_trigger_view WHERE id = ANY($1);' + USING (SELECT array_agg(id) FROM old_table); + END IF; + + RETURN NULL; +END; +$$ LANGUAGE plpgsql VOLATILE; +-- Trigger for Insert +CREATE TRIGGER t_refresh_mv_ins + AFTER INSERT ON mv_trigger_base + REFERENCING NEW TABLE AS new_table + FOR EACH STATEMENT +EXECUTE FUNCTION maintain_mv_trigger_view(); +-- Trigger for Update +CREATE TRIGGER t_refresh_mv_upd + AFTER UPDATE ON mv_trigger_base + REFERENCING NEW TABLE AS new_table + FOR EACH STATEMENT +EXECUTE FUNCTION maintain_mv_trigger_view(); +-- Trigger for Delete +CREATE TRIGGER t_refresh_mv_del + AFTER DELETE ON mv_trigger_base + REFERENCING OLD TABLE AS old_table + FOR EACH STATEMENT +EXECUTE FUNCTION maintain_mv_trigger_view(); +-- 1. Test Insert +INSERT INTO mv_trigger_base VALUES (1, 'Auto-Insert'), (2, 'Auto-Insert'); +SELECT * FROM mv_trigger_view ORDER BY id; + id | val +----+------------- + 1 | Auto-Insert + 2 | Auto-Insert +(2 rows) + +-- 2. Test Update +UPDATE mv_trigger_base SET val = 'Auto-Update' WHERE id = 1; +SELECT * FROM mv_trigger_view ORDER BY id; + id | val +----+------------- + 1 | Auto-Update + 2 | Auto-Insert +(2 rows) + +-- 3. Test Delete +DELETE FROM mv_trigger_base WHERE id = 2; +SELECT * FROM mv_trigger_view ORDER BY id; + id | val +----+------------- + 1 | Auto-Update +(1 row) + +-- 4. Verify Transaction Isolation +-- Ensure that if the main transaction rolls back, the Refresh also rolls back +BEGIN; +INSERT INTO mv_trigger_base VALUES (99, 'Rollback'); +SELECT * FROM mv_trigger_view WHERE id = 99; -- Should see it + id | val +----+---------- + 99 | Rollback +(1 row) + +ROLLBACK; +SELECT * FROM mv_trigger_view WHERE id = 99; -- Should NOT see it + id | val +----+----- +(0 rows) + +-- Cleanup +DROP MATERIALIZED VIEW mv_trigger_view; +DROP TABLE mv_trigger_base; +DROP FUNCTION maintain_mv_trigger_view(); diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index cc365393bb7..738442d9a32 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -68,6 +68,11 @@ test: select_into select_distinct select_distinct_on select_implicit select_havi # ---------- test: brin gin gist spgist privileges init_privs security_label collate matview lock replica_identity rowsecurity object_address tablesample groupingsets drop_operator password identity generated_stored join_hash +# ---------- +# Additional Mat View tests +# ---------- +test: matview_where + # ---------- # Additional BRIN tests # ---------- diff --git a/src/test/regress/sql/matview_where.sql b/src/test/regress/sql/matview_where.sql new file mode 100644 index 00000000000..c3faceed27d --- /dev/null +++ b/src/test/regress/sql/matview_where.sql @@ -0,0 +1,302 @@ +-- +-- REFRESH MATERIALIZED VIEW ... WHERE ... +-- + +-- Setup +CREATE TABLE mv_base_a (id int primary key, val text); +INSERT INTO mv_base_a VALUES (1, 'One'), (2, 'Two'), (3, 'Three'); + +CREATE MATERIALIZED VIEW mv_test_a AS SELECT * FROM mv_base_a; +CREATE UNIQUE INDEX ON mv_test_a(id); + +-- +-- Test 1: Syntax and Error handling +-- + +-- 1.1 WITH NO DATA + WHERE -> Error +REFRESH MATERIALIZED VIEW mv_test_a WITH NO DATA WHERE id = 1; + +-- 1.2 Unpopulated + WHERE -> Error +CREATE MATERIALIZED VIEW mv_unpop AS SELECT * FROM mv_base_a WITH NO DATA; +REFRESH MATERIALIZED VIEW mv_unpop WHERE id = 1; +DROP MATERIALIZED VIEW mv_unpop; + +-- 1.3 Volatile functions -> Error +REFRESH MATERIALIZED VIEW mv_test_a WHERE random() > 0.5; + +-- 1.4 Aggregates -> Error +REFRESH MATERIALIZED VIEW mv_test_a WHERE count(*) > 0; + +-- +-- Test 2: Non-concurrent Partial Refresh +-- + +-- Modify base data +UPDATE mv_base_a SET val = 'One Updated' WHERE id = 1; +UPDATE mv_base_a SET val = 'Two Updated' WHERE id = 2; + +-- Refresh only id=1 +REFRESH MATERIALIZED VIEW mv_test_a WHERE id = 1; + +-- Verify: id=1 should be updated, id=2 should remain stale +SELECT * FROM mv_test_a ORDER BY id; + +-- Refresh id=2 +REFRESH MATERIALIZED VIEW mv_test_a WHERE id = 2; +SELECT * FROM mv_test_a ORDER BY id; + +-- +-- Test 3: Concurrent Partial Refresh +-- + +-- Modify base data +UPDATE mv_base_a SET val = 'One Concurrent' WHERE id = 1; +UPDATE mv_base_a SET val = 'Two Concurrent' WHERE id = 2; + +-- Refresh only id=1 +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_test_a WHERE id = 1; + +-- Verify: id=1 updated, id=2 stale +SELECT * FROM mv_test_a ORDER BY id; + +-- Refresh id=2 +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_test_a WHERE id = 2; +SELECT * FROM mv_test_a ORDER BY id; + +-- Cleanup Test 2/3 +DROP MATERIALIZED VIEW mv_test_a; +DROP TABLE mv_base_a; + +-- +-- Test 4: Join View (Invoice style) +-- + +CREATE TABLE invoices (id int primary key, total numeric); +CREATE TABLE invoice_items (inv_id int references invoices(id), amount numeric); + +INSERT INTO invoices VALUES (1, 0), (2, 0); +INSERT INTO invoice_items VALUES (1, 100), (1, 50), (2, 200); + +CREATE MATERIALIZED VIEW mv_invoices AS + SELECT i.id, sum(ii.amount) as computed_total + FROM invoices i + JOIN invoice_items ii ON i.id = ii.inv_id + GROUP BY i.id; + +CREATE UNIQUE INDEX ON mv_invoices(id); + +SELECT * FROM mv_invoices ORDER BY id; + +-- Modify items for invoice 1 +INSERT INTO invoice_items VALUES (1, 25); +-- Modify items for invoice 2 +INSERT INTO invoice_items VALUES (2, 50); + +-- Refresh only invoice 1 +REFRESH MATERIALIZED VIEW mv_invoices WHERE id = 1; + +-- Verify: Invoice 1 updated (175), Invoice 2 stale (200) +SELECT * FROM mv_invoices ORDER BY id; + +-- Refresh invoice 2 concurrently +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_invoices WHERE id = 2; +-- Verify: Invoice 2 updated (250) +SELECT * FROM mv_invoices ORDER BY id; + +DROP MATERIALIZED VIEW mv_invoices; +DROP TABLE invoice_items; +DROP TABLE invoices; + +-- +-- Test 5: Rows entering/leaving view scope +-- + +CREATE TABLE items (id int, status text, val int); +INSERT INTO items VALUES (1, 'active', 10), (2, 'inactive', 20); + +CREATE MATERIALIZED VIEW mv_active_items AS + SELECT * FROM items WHERE status = 'active'; + +CREATE UNIQUE INDEX ON mv_active_items(id); + +SELECT * FROM mv_active_items ORDER BY id; + +-- Case A: Row changes status active -> inactive (should be removed) +UPDATE items SET status = 'inactive' WHERE id = 1; +-- Also update row 2 to active (should be added) +UPDATE items SET status = 'active' WHERE id = 2; + +-- Refresh partial WHERE id=1 +-- Should remove id=1 because it no longer matches view definition +REFRESH MATERIALIZED VIEW mv_active_items WHERE id = 1; +SELECT * FROM mv_active_items ORDER BY id; + +-- Case B: Refresh to add row 2 (which is now active) +REFRESH MATERIALIZED VIEW mv_active_items WHERE id = 2; +SELECT * FROM mv_active_items ORDER BY id; + +-- Cleanup +DROP MATERIALIZED VIEW mv_active_items; +DROP TABLE items; + +-- +-- Test 6: Order of Operations (Value Swap) +-- Addressed specific worry: "The order of tuple processing matters" +-- + +CREATE TABLE mv_swap_base (id int primary key, code text); +INSERT INTO mv_swap_base VALUES (1, 'A'), (2, 'B'); + +CREATE MATERIALIZED VIEW mv_swap AS SELECT * FROM mv_swap_base; +CREATE UNIQUE INDEX ON mv_swap(code); -- Unique Index is on code, not ID + +SELECT * FROM mv_swap ORDER BY id; + +-- Perform a swap in the base table +-- 1 becomes B, 2 becomes A +BEGIN; +UPDATE mv_swap_base SET code = 'TEMP' WHERE id = 1; +UPDATE mv_swap_base SET code = 'A' WHERE id = 2; +UPDATE mv_swap_base SET code = 'B' WHERE id = 1; +COMMIT; + +-- Refresh both rows concurrently. +-- If the implementation inserts (1, 'B') before deleting (2, 'B'), this will fail. +-- It relies on the implementation correctly handling the Delete/Lock set before the Insert/Upsert. +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_swap WHERE id IN (1, 2); + +SELECT * FROM mv_swap ORDER BY id; + +DROP MATERIALIZED VIEW mv_swap; +DROP TABLE mv_swap_base; + +-- +-- Test 7: Scope Drift / Constraint Violation +-- Addressed specific worry: "If WHERE predicate would be different... UK violation couldn't be solved" +-- + +CREATE TABLE mv_drift_base (id int primary key, category_id int); +INSERT INTO mv_drift_base VALUES (1, 100), (2, 200); + +CREATE MATERIALIZED VIEW mv_drift AS SELECT * FROM mv_drift_base; +-- KEY FIX: Index on ID, not Category. +-- We want to test that the Refresh logic detects ID conflicts when rows drift into scope, +-- not that Postgres enforces unique indexes on non-unique data. +CREATE UNIQUE INDEX ON mv_drift(id); + +-- Update Row 1 to collide with Row 2's category +UPDATE mv_drift_base SET category_id = 200 WHERE id = 1; + +-- Attempt to refresh using the NEW category value as the filter. +-- The View still contains (1, 100). +-- The Filter "category_id = 200" sees the NEW row (1, 200) in the base table. +-- The Filter "category_id = 200" does NOT see the OLD row (1, 100) in the View. +-- Result: The system thinks (1, 200) is a brand new row and tries to INSERT it. +-- This MUST fail with a Unique Constraint Violation on 'id' because (1, 100) was never deleted. +\set VERBOSITY terse +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_drift WHERE category_id = 200; +\set VERBOSITY default + +-- Correct usage: Scope must include BOTH the Old location (100) and New location (200) +-- so the system sees the update as an update (or delete+insert). +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_drift WHERE category_id IN (100, 200); + +SELECT * FROM mv_drift ORDER BY id; + +DROP MATERIALIZED VIEW mv_drift; +DROP TABLE mv_drift_base; + +-- +-- Test 8: Multiple Unique Keys +-- Addressed specific worry: "what if we have multiple UKs?" +-- + +CREATE TABLE mv_multi_uk (id int primary key, email text, username text); +INSERT INTO mv_multi_uk VALUES (1, '[email protected]', 'user_a'); + +CREATE MATERIALIZED VIEW mv_multi AS SELECT * FROM mv_multi_uk; +CREATE UNIQUE INDEX ON mv_multi(email); +CREATE UNIQUE INDEX ON mv_multi(username); + +-- Update all columns +UPDATE mv_multi_uk SET email = '[email protected]', username = 'user_b' WHERE id = 1; + +-- Refresh should succeed updating all unique indexes +REFRESH MATERIALIZED VIEW CONCURRENTLY mv_multi WHERE id = 1; + +SELECT * FROM mv_multi; + +DROP MATERIALIZED VIEW mv_multi; +DROP TABLE mv_multi_uk; + +-- +-- Test 9: Trigger-based Automatic Maintenance +-- Use Case: Automating the partial refresh via triggers using Arrays. +-- + +CREATE TABLE mv_trigger_base (id int primary key, val text); +CREATE MATERIALIZED VIEW mv_trigger_view AS SELECT * FROM mv_trigger_base; +CREATE UNIQUE INDEX ON mv_trigger_view(id); + +-- Create a maintainer function +CREATE OR REPLACE FUNCTION maintain_mv_trigger_view() RETURNS TRIGGER AS $$ +BEGIN + IF (TG_OP IN ('INSERT', 'UPDATE')) THEN + EXECUTE 'REFRESH MATERIALIZED VIEW mv_trigger_view WHERE id = ANY($1);' + USING (SELECT array_agg(id) FROM new_table); + END IF; + + IF (TG_OP IN ('DELETE')) THEN + EXECUTE 'REFRESH MATERIALIZED VIEW mv_trigger_view WHERE id = ANY($1);' + USING (SELECT array_agg(id) FROM old_table); + END IF; + + RETURN NULL; +END; +$$ LANGUAGE plpgsql VOLATILE; + +-- Trigger for Insert +CREATE TRIGGER t_refresh_mv_ins + AFTER INSERT ON mv_trigger_base + REFERENCING NEW TABLE AS new_table + FOR EACH STATEMENT +EXECUTE FUNCTION maintain_mv_trigger_view(); + +-- Trigger for Update +CREATE TRIGGER t_refresh_mv_upd + AFTER UPDATE ON mv_trigger_base + REFERENCING NEW TABLE AS new_table + FOR EACH STATEMENT +EXECUTE FUNCTION maintain_mv_trigger_view(); + +-- Trigger for Delete +CREATE TRIGGER t_refresh_mv_del + AFTER DELETE ON mv_trigger_base + REFERENCING OLD TABLE AS old_table + FOR EACH STATEMENT +EXECUTE FUNCTION maintain_mv_trigger_view(); + +-- 1. Test Insert +INSERT INTO mv_trigger_base VALUES (1, 'Auto-Insert'), (2, 'Auto-Insert'); +SELECT * FROM mv_trigger_view ORDER BY id; + +-- 2. Test Update +UPDATE mv_trigger_base SET val = 'Auto-Update' WHERE id = 1; +SELECT * FROM mv_trigger_view ORDER BY id; + +-- 3. Test Delete +DELETE FROM mv_trigger_base WHERE id = 2; +SELECT * FROM mv_trigger_view ORDER BY id; + +-- 4. Verify Transaction Isolation +-- Ensure that if the main transaction rolls back, the Refresh also rolls back +BEGIN; +INSERT INTO mv_trigger_base VALUES (99, 'Rollback'); +SELECT * FROM mv_trigger_view WHERE id = 99; -- Should see it +ROLLBACK; +SELECT * FROM mv_trigger_view WHERE id = 99; -- Should NOT see it + +-- Cleanup +DROP MATERIALIZED VIEW mv_trigger_view; +DROP TABLE mv_trigger_base; +DROP FUNCTION maintain_mv_trigger_view(); -- 2.34.1
