On Fri, Mar 31, 2023 at 05:49:21AM +0000, fujii.y...@df.mitsubishielectric.co.jp wrote: > Hi Mr.Momjian > > > First, am I correct? > Yes, you are correct. This patch uses new special aggregate functions for > partial aggregate > (then we call this partialaggfunc).
First, my apologies for not addressing this sooner. I was so focused on my own tasks that I didn't realize this very important patch was not getting attention. I will try my best to get it into PG 17. What amazes me is that you didn't need to create _any_ actual aggregate functions. Rather, you just needed to hook existing functions into the aggregate tables for partial FDW execution. > > Second, how far away is this from being committable > > and/or what work needs to be done to get it committable, either for PG 16 > > or 17? > I believe there are three: 1. and 2. are not clear if they are necessary or > not; 3. are clearly necessary. > I would like to hear the opinions of the development community on whether or > not 1. and 2. need to be addressed. > > 1. Making partialaggfunc user-defined function > In v17, I make partialaggfuncs as built-in functions. > Because of this approach, v17 changes specification of BKI file and > pg_aggregate. > For now, partialaggfuncs are needed by only postgres_fdw which is just an > extension of PostgreSQL. > In the future, when revising the specifications for BKI files and > pg_aggregate when modifying existing PostgreSQL functions, > It is necessary to align them with this patch's changes. > I am concerned that this may be undesirable. > So I am thinking that v17 should be modified to making partialaggfunc as user > defined function. I think we have three possible cases for aggregates pushdown to FDWs: 1) Postgres built-in aggregate functions 2) Postgres user-defined & extension aggregate functions 3) aggregate functions calls to non-PG FDWs Your patch handles #1 by checking that the FDW Postgres version is the same as the calling Postgres version. However, it doesn't check for extension versions, and frankly, I don't see how we could implement that cleanly without significant complexity. I suggest we remove the version check requirement --- instead just document that the FDW Postgres version should be the same or newer than the calling Postgres server --- that way, we can assume that whatever is in the system catalogs of the caller is in the receiving side. We should add a GUC to turn off this optimization for cases where the FDW Postgres version is older than the caller. This handles case 1-2. For case 3, I don't even know how much pushdown those do of _any_ aggregates to non-PG servers, let along parallel FDW ones. Does anyone know the details? > 2. Automation of creating definition of partialaggfuncs > In development of v17, I manually create definition of partialaggfuncs for > avg, min, max, sum, count. > I am concerned that this may be undesirable. > So I am thinking that v17 should be modified to automate creating definition > of partialaggfuncs > for all built-in aggregate functions. Are there any other builtin functions that need this? I think we can just provide documention for extensions on how to do this. > 3. Documentation > I need add explanation of partialaggfunc to documents on postgres_fdw and > other places. I can help with that once we decide on the above. I think 'partialaggfn' should be named 'aggpartialfn' to match other columns in pg_aggregate. I am confused by these changes to pg_aggegate: +{ aggfnoid => 'sum_p_int8', aggtransfn => 'int8_avg_accum', + aggfinalfn => 'int8_avg_serialize', aggcombinefn => 'int8_avg_combine', + aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize', + aggtranstype => 'internal', aggtransspace => '48' }, ... +{ aggfnoid => 'sum_p_numeric', aggtransfn => 'numeric_avg_accum', + aggfinalfn => 'numeric_avg_serialize', aggcombinefn => 'numeric_avg_combine', + aggserialfn => 'numeric_avg_serialize', + aggdeserialfn => 'numeric_avg_deserialize', + aggtranstype => 'internal', aggtransspace => '128' }, Why are these marked as 'sum' but use 'avg' functions? It would be good to explain exactly how this is diffent from background worker parallelism. -- Bruce Momjian <br...@momjian.us> https://momjian.us EDB https://enterprisedb.com Embrace your flaws. They make you human, rather than perfect, which you will never be.
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 09d6dd60dd..fe219b8d2f 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -202,7 +202,7 @@ static bool is_subquery_var(Var *node, RelOptInfo *foreignrel, int *relno, int *colno); static void get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel, int *relno, int *colno); - +static bool partial_agg_ok(Aggref* agg, PgFdwRelationInfo* fpinfo); /* * Examine each qual clause in input_conds, and classify them into two groups, @@ -907,8 +907,9 @@ foreign_expr_walker(Node *node, if (!IS_UPPER_REL(glob_cxt->foreignrel)) return false; - /* Only non-split aggregates are pushable. */ - if (agg->aggsplit != AGGSPLIT_SIMPLE) + if ((agg->aggsplit != AGGSPLIT_SIMPLE) && (agg->aggsplit != AGGSPLIT_INITIAL_SERIAL)) + return false; + if (agg->aggsplit == AGGSPLIT_INITIAL_SERIAL && !partial_agg_ok(agg, fpinfo)) return false; /* As usual, it must be shippable. */ @@ -3517,14 +3518,37 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context) StringInfo buf = context->buf; bool use_variadic; - /* Only basic, non-split aggregation accepted. */ - Assert(node->aggsplit == AGGSPLIT_SIMPLE); + + Assert((node->aggsplit == AGGSPLIT_SIMPLE) || + (node->aggsplit == AGGSPLIT_INITIAL_SERIAL)); /* Check if need to print VARIADIC (cf. ruleutils.c) */ use_variadic = node->aggvariadic; - /* Find aggregate name from aggfnoid which is a pg_proc entry */ - appendFunctionName(node->aggfnoid, context); + if (node->aggsplit == AGGSPLIT_SIMPLE) { + /* Find aggregate name from aggfnoid which is a pg_proc entry */ + appendFunctionName(node->aggfnoid, context); + } + else { + HeapTuple aggtup; + Form_pg_aggregate aggform; + + /* Find aggregate name from aggfnoid or partialaggfn which is a pg_proc entry */ + aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(node->aggfnoid)); + if (!HeapTupleIsValid(aggtup)) + elog(ERROR, "cache lookup failed for aggregate %u", node->aggfnoid); + aggform = (Form_pg_aggregate)GETSTRUCT(aggtup); + + if ((aggform->aggtranstype != INTERNALOID) && (aggform->aggfinalfn == InvalidOid)) { + appendFunctionName(node->aggfnoid, context); + } else if(aggform->partialaggfn) { + appendFunctionName(aggform->partialaggfn, context); + } else { + elog(ERROR, "there is no partialaggfn %u", node->aggfnoid); + } + ReleaseSysCache(aggtup); + } + appendStringInfoChar(buf, '('); /* Add DISTINCT */ @@ -4047,3 +4071,62 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel, /* Shouldn't get here */ elog(ERROR, "unexpected expression in subquery output"); } + +/* + * Check that partial aggregate function, described by aggform, + * exists on remote server, described by fpinfo. + */ +static bool +partial_agg_compatible(Form_pg_aggregate aggform, PgFdwRelationInfo *fpinfo) +{ + int32 partialagg_minversion = PG_VERSION_NUM; + if (aggform->partialagg_minversion != PARTIALAGG_MINVERSION_DEFAULT) { + partialagg_minversion = aggform->partialagg_minversion; + } + return (fpinfo->server_version >= partialagg_minversion); +} + +/* + * Check that partial aggregate agg is fine to push down. + * + * It is fine when all of the following conditions are true. + * condition1) agg is AGGKIND_NORMAL aggregate which contains no distinct + * or order by clauses + * condition2) there is an aggregate function for partial aggregation + * (then we call this partialaggfunc) corresponding to agg. + * condition2 is true when either of the following cases is true. + * case2-1) return type of agg is not internal and agg has no finalfunc. + * In this case, partialaggfunc is agg itself. + * case2-2) agg has valid partialaggfn and partialagg_minversion <= server_version. + * In this case, partialaggfunc is a aggregate function whose oid is partialaggfn. + */ +static bool +partial_agg_ok(Aggref* agg, PgFdwRelationInfo *fpinfo) +{ + HeapTuple aggtup; + Form_pg_aggregate aggform; + bool partial_agg_ok = true; + + Assert(agg->aggsplit == AGGSPLIT_INITIAL_SERIAL); + + /* We don't support complex partial aggregates */ + if (agg->aggdistinct || agg->aggkind != AGGKIND_NORMAL || agg->aggorder != NIL) + return false; + + aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(agg->aggfnoid)); + if (!HeapTupleIsValid(aggtup)) + elog(ERROR, "cache lookup failed for aggregate %u", agg->aggfnoid); + aggform = (Form_pg_aggregate)GETSTRUCT(aggtup); + + if ((aggform->aggtranstype == INTERNALOID) || (aggform->aggfinalfn != InvalidOid)) { + /* Only aggregates which has partialaggfn, are allowed */ + if (!aggform->partialaggfn) { + partial_agg_ok = false; + } else if (!partial_agg_compatible(aggform, fpinfo)) { + partial_agg_ok = false; + } + } + + ReleaseSysCache(aggtup); + return partial_agg_ok; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 8f6a04f71b..ebdb7cff3b 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9677,13 +9677,34 @@ RESET enable_partitionwise_join; -- =================================================================== -- test partitionwise aggregates -- =================================================================== -CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a); +CREATE OR REPLACE FUNCTION f_server_version_lag(version_offset int4) RETURNS text AS $$ + SELECT (setting::int4 + version_offset)::text FROM pg_settings WHERE name = 'server_version_num'; +$$ LANGUAGE sql; +CREATE OR REPLACE FUNCTION f_alter_server_version(server_name text, operation text, version_offset int4) RETURNS void AS $$ + DECLARE + version_num text; + BEGIN + EXECUTE 'SELECT f_server_version_lag($1) ' + INTO version_num + USING version_offset; + EXECUTE format('ALTER SERVER %I OPTIONS (%I server_version %L)', + server_name, operation, version_num); + RETURN; + END; +$$ LANGUAGE plpgsql; +SELECT f_alter_server_version('loopback', 'add', 0); + f_alter_server_version +------------------------ + +(1 row) + +CREATE TABLE pagg_tab (a int, b int, c text, d int4) PARTITION BY RANGE(a); CREATE TABLE pagg_tab_p1 (LIKE pagg_tab); CREATE TABLE pagg_tab_p2 (LIKE pagg_tab); CREATE TABLE pagg_tab_p3 (LIKE pagg_tab); -INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10; -INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10; -INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20; +INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10; +INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10; +INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20; -- Create foreign partitions CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1'); CREATE FOREIGN TABLE fpagg_tab_p2 PARTITION OF pagg_tab FOR VALUES FROM (10) TO (20) SERVER loopback OPTIONS (table_name 'pagg_tab_p2'); @@ -9742,8 +9763,8 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O -- Should have all the columns in the target list for the given relation EXPLAIN (VERBOSE, COSTS OFF) SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1; - QUERY PLAN ------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Sort Output: t1.a, (count(((t1.*)::pagg_tab))) Sort Key: t1.a @@ -9754,21 +9775,21 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1; Filter: (avg(t1.b) < '22'::numeric) -> Foreign Scan on public.fpagg_tab_p1 t1 Output: t1.a, t1.*, t1.b - Remote SQL: SELECT a, b, c FROM public.pagg_tab_p1 + Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p1 -> HashAggregate Output: t1_1.a, count(((t1_1.*)::pagg_tab)) Group Key: t1_1.a Filter: (avg(t1_1.b) < '22'::numeric) -> Foreign Scan on public.fpagg_tab_p2 t1_1 Output: t1_1.a, t1_1.*, t1_1.b - Remote SQL: SELECT a, b, c FROM public.pagg_tab_p2 + Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p2 -> HashAggregate Output: t1_2.a, count(((t1_2.*)::pagg_tab)) Group Key: t1_2.a Filter: (avg(t1_2.b) < '22'::numeric) -> Foreign Scan on public.fpagg_tab_p3 t1_2 Output: t1_2.a, t1_2.*, t1_2.b - Remote SQL: SELECT a, b, c FROM public.pagg_tab_p3 + Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p3 (25 rows) SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1; @@ -9804,6 +9825,263 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 -> Foreign Scan on fpagg_tab_p3 pagg_tab_2 (15 rows) +-- It's unsafe to push down having clause when there are partial aggregates +EXPLAIN (VERBOSE, COSTS OFF) +SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Sort + Output: pagg_tab.b, (max(pagg_tab.a)), (count(*)) + Sort Key: pagg_tab.b + -> Finalize HashAggregate + Output: pagg_tab.b, max(pagg_tab.a), count(*) + Group Key: pagg_tab.b + Filter: (sum(pagg_tab.a) < 700) + -> Append + -> Partial HashAggregate + Output: pagg_tab.b, PARTIAL max(pagg_tab.a), PARTIAL count(*), PARTIAL sum(pagg_tab.a) + Group Key: pagg_tab.b + -> Foreign Scan on public.fpagg_tab_p1 pagg_tab + Output: pagg_tab.b, pagg_tab.a + Remote SQL: SELECT a, b FROM public.pagg_tab_p1 + -> Partial HashAggregate + Output: pagg_tab_1.b, PARTIAL max(pagg_tab_1.a), PARTIAL count(*), PARTIAL sum(pagg_tab_1.a) + Group Key: pagg_tab_1.b + -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_1 + Output: pagg_tab_1.b, pagg_tab_1.a + Remote SQL: SELECT a, b FROM public.pagg_tab_p2 + -> Partial HashAggregate + Output: pagg_tab_2.b, PARTIAL max(pagg_tab_2.a), PARTIAL count(*), PARTIAL sum(pagg_tab_2.a) + Group Key: pagg_tab_2.b + -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_2 + Output: pagg_tab_2.b, pagg_tab_2.a + Remote SQL: SELECT a, b FROM public.pagg_tab_p3 +(26 rows) + +-- Partial aggregates are fine to push down without having clause +EXPLAIN (VERBOSE, COSTS OFF) +SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: pagg_tab.b, (min(pagg_tab.a)), (max(pagg_tab.a)), (count(*)), (sum(pagg_tab.d)), (sum((pagg_tab.d)::bigint)), (avg(pagg_tab.d)), (avg((pagg_tab.d)::bigint)) + Sort Key: pagg_tab.b + -> Finalize HashAggregate + Output: pagg_tab.b, min(pagg_tab.a), max(pagg_tab.a), count(*), sum(pagg_tab.d), sum((pagg_tab.d)::bigint), avg(pagg_tab.d), avg((pagg_tab.d)::bigint) + Group Key: pagg_tab.b + -> Append + -> Foreign Scan + Output: pagg_tab.b, (PARTIAL min(pagg_tab.a)), (PARTIAL max(pagg_tab.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab.d)), (PARTIAL sum((pagg_tab.d)::bigint)), (PARTIAL avg(pagg_tab.d)), (PARTIAL avg((pagg_tab.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab) + Remote SQL: SELECT b, min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p1 GROUP BY 1 + -> Foreign Scan + Output: pagg_tab_1.b, (PARTIAL min(pagg_tab_1.a)), (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_1.d)), (PARTIAL sum((pagg_tab_1.d)::bigint)), (PARTIAL avg(pagg_tab_1.d)), (PARTIAL avg((pagg_tab_1.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1) + Remote SQL: SELECT b, min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p2 GROUP BY 1 + -> Foreign Scan + Output: pagg_tab_2.b, (PARTIAL min(pagg_tab_2.a)), (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_2.d)), (PARTIAL sum((pagg_tab_2.d)::bigint)), (PARTIAL avg(pagg_tab_2.d)), (PARTIAL avg((pagg_tab_2.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2) + Remote SQL: SELECT b, min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p3 GROUP BY 1 +(19 rows) + +SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1; + b | min | max | count | sum | sum | avg | avg +----+-----+-----+-------+------+------+---------------------+--------------------- + 0 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000 + 1 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000 + 2 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000 + 3 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000 + 4 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000 + 5 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000 + 6 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000 + 7 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000 + 8 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000 + 9 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000 + 10 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000 + 11 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000 + 12 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000 + 13 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000 + 14 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000 + 15 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000 + 16 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000 + 17 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000 + 18 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000 + 19 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000 + 20 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000 + 21 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000 + 22 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000 + 23 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000 + 24 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000 + 25 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000 + 26 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000 + 27 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000 + 28 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000 + 29 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000 + 30 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000 + 31 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000 + 32 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000 + 33 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000 + 34 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000 + 35 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000 + 36 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000 + 37 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000 + 38 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000 + 39 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000 + 40 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000 + 41 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000 + 42 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000 + 43 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000 + 44 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000 + 45 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000 + 46 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000 + 47 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000 + 48 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000 + 49 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000 +(50 rows) + +-- Partial aggregates are fine to push down +EXPLAIN (VERBOSE, COSTS OFF) +SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Finalize Aggregate + Output: min(pagg_tab.a), max(pagg_tab.a), count(*), sum(pagg_tab.d), sum((pagg_tab.d)::bigint), avg(pagg_tab.d), avg((pagg_tab.d)::bigint) + -> Append + -> Foreign Scan + Output: (PARTIAL min(pagg_tab.a)), (PARTIAL max(pagg_tab.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab.d)), (PARTIAL sum((pagg_tab.d)::bigint)), (PARTIAL avg(pagg_tab.d)), (PARTIAL avg((pagg_tab.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab) + Remote SQL: SELECT min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p1 + -> Foreign Scan + Output: (PARTIAL min(pagg_tab_1.a)), (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_1.d)), (PARTIAL sum((pagg_tab_1.d)::bigint)), (PARTIAL avg(pagg_tab_1.d)), (PARTIAL avg((pagg_tab_1.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1) + Remote SQL: SELECT min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p2 + -> Foreign Scan + Output: (PARTIAL min(pagg_tab_2.a)), (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_2.d)), (PARTIAL sum((pagg_tab_2.d)::bigint)), (PARTIAL avg(pagg_tab_2.d)), (PARTIAL avg((pagg_tab_2.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2) + Remote SQL: SELECT min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p3 +(15 rows) + +SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab; + min | max | count | sum | sum | avg | avg +-----+-----+-------+-------+-------+---------------------+--------------------- + 0 | 29 | 3000 | 58500 | 58500 | 19.5000000000000000 | 19.5000000000000000 +(1 row) + +-- It's unsafe to push down partial aggregates which contains distinct clause +EXPLAIN (VERBOSE, COSTS OFF) +SELECT max(a), count(distinct b) FROM pagg_tab; + QUERY PLAN +----------------------------------------------------------------------------------------- + Aggregate + Output: max(pagg_tab.a), count(DISTINCT pagg_tab.b) + -> Merge Append + Sort Key: pagg_tab.b + -> Foreign Scan on public.fpagg_tab_p1 pagg_tab_1 + Output: pagg_tab_1.a, pagg_tab_1.b + Remote SQL: SELECT a, b FROM public.pagg_tab_p1 ORDER BY b ASC NULLS LAST + -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_2 + Output: pagg_tab_2.a, pagg_tab_2.b + Remote SQL: SELECT a, b FROM public.pagg_tab_p2 ORDER BY b ASC NULLS LAST + -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_3 + Output: pagg_tab_3.a, pagg_tab_3.b + Remote SQL: SELECT a, b FROM public.pagg_tab_p3 ORDER BY b ASC NULLS LAST +(13 rows) + +SELECT max(a), count(distinct b) FROM pagg_tab; + max | count +-----+------- + 29 | 50 +(1 row) + +-- It's unsafe to push down partial aggregates when +-- server_version is older than partialagg_minversion, described by pg_aggregate +SELECT f_alter_server_version('loopback', 'set', -1); + f_alter_server_version +------------------------ + +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT avg(d) FROM pagg_tab; + QUERY PLAN +------------------------------------------------------------------ + Finalize Aggregate + Output: avg(pagg_tab.d) + -> Append + -> Partial Aggregate + Output: PARTIAL avg(pagg_tab.d) + -> Foreign Scan on public.fpagg_tab_p1 pagg_tab + Output: pagg_tab.d + Remote SQL: SELECT d FROM public.pagg_tab_p1 + -> Partial Aggregate + Output: PARTIAL avg(pagg_tab_1.d) + -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_1 + Output: pagg_tab_1.d + Remote SQL: SELECT d FROM public.pagg_tab_p2 + -> Partial Aggregate + Output: PARTIAL avg(pagg_tab_2.d) + -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_2 + Output: pagg_tab_2.d + Remote SQL: SELECT d FROM public.pagg_tab_p3 +(18 rows) + +SELECT avg(d) FROM pagg_tab; + avg +--------------------- + 19.5000000000000000 +(1 row) + +-- It's safe to push down partial aggregate for user defined aggregate +-- which has valid options. +CREATE AGGREGATE postgres_fdw_sum_p_int8(int8) ( + SFUNC = int8_avg_accum, + STYPE = internal, + FINALFUNC = int8_avg_serialize +); +CREATE AGGREGATE postgres_fdw_sum(int8) ( + SFUNC = int8_avg_accum, + STYPE = internal, + COMBINEFUNC = int8_avg_combine, + FINALFUNC = numeric_poly_sum, + SERIALFUNC = int8_avg_serialize, + DESERIALFUNC = int8_avg_deserialize, + PARTIALAGGFUNC = postgres_fdw_sum_p_int8, + PARTIALAGG_MINVERSION = 150000 +); +ALTER EXTENSION postgres_fdw ADD FUNCTION postgres_fdw_sum(int8); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT postgres_fdw_sum(d::int8) FROM pagg_tab; + QUERY PLAN +---------------------------------------------------------------------------------------------------- + Finalize Aggregate + Output: postgres_fdw_sum((pagg_tab.d)::bigint) + -> Append + -> Foreign Scan + Output: (PARTIAL postgres_fdw_sum((pagg_tab.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab) + Remote SQL: SELECT public.postgres_fdw_sum_p_int8(d::bigint) FROM public.pagg_tab_p1 + -> Foreign Scan + Output: (PARTIAL postgres_fdw_sum((pagg_tab_1.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1) + Remote SQL: SELECT public.postgres_fdw_sum_p_int8(d::bigint) FROM public.pagg_tab_p2 + -> Foreign Scan + Output: (PARTIAL postgres_fdw_sum((pagg_tab_2.d)::bigint)) + Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2) + Remote SQL: SELECT public.postgres_fdw_sum_p_int8(d::bigint) FROM public.pagg_tab_p3 +(15 rows) + +SELECT postgres_fdw_sum(d::int8) FROM pagg_tab; + postgres_fdw_sum +------------------ + 58500 +(1 row) + +ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_sum(int8); +DROP AGGREGATE postgres_fdw_sum(int8); +DROP AGGREGATE postgres_fdw_sum_p_int8(int8); +DROP FUNCTION f_server_version_lag(int4); +DROP FUNCTION f_alter_server_version(text, text, int4); +ALTER SERVER loopback OPTIONS (DROP server_version); -- =================================================================== -- access rights and superuser -- =================================================================== diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 4229d2048c..75283fa2d8 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -211,6 +211,27 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) errmsg("sslcert and sslkey are superuser-only"), errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser."))); } + else if (strcmp(def->defname, "server_version") == 0) + { + char* value; + int int_val; + bool is_parsed; + + value = defGetString(def); + is_parsed = parse_int(value, &int_val, 0, NULL); + + if (!is_parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for integer option \"%s\": %s", + def->defname, value))); + + if (int_val <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" must be an integer value greater than zero", + def->defname))); + } else if (strcmp(def->defname, "analyze_sampling") == 0) { char *value; @@ -288,6 +309,9 @@ InitPgFdwOptions(void) {"sslcert", UserMappingRelationId, true}, {"sslkey", UserMappingRelationId, true}, + /* options for partial aggregation pushdown */ + {"server_version", ForeignServerRelationId, false}, + {NULL, InvalidOid, false} }; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index d728bd70b3..7b51acadf3 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -519,7 +519,7 @@ static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra); static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, - Node *havingQual); + Node *havingQual, PartitionwiseAggregateType patype); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); @@ -651,6 +651,7 @@ postgresGetForeignRelSize(PlannerInfo *root, fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; fpinfo->async_capable = false; + fpinfo->server_version = 0; apply_server_options(fpinfo); apply_table_options(fpinfo); @@ -6130,6 +6131,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); else if (strcmp(def->defname, "async_capable") == 0) fpinfo->async_capable = defGetBoolean(def); + else if (strcmp(def->defname, "server_version") == 0) + (void)parse_int(defGetString(def), &fpinfo->server_version, 0, NULL); } } @@ -6188,6 +6191,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; fpinfo->async_capable = fpinfo_o->async_capable; + fpinfo->server_version = fpinfo_o->server_version; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -6367,7 +6371,7 @@ postgresGetForeignJoinPaths(PlannerInfo *root, */ static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, - Node *havingQual) + Node *havingQual, PartitionwiseAggregateType patype) { Query *query = root->parse; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; @@ -6381,6 +6385,10 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, if (query->groupingSets) return false; + /* It's unsafe to push having statements with partial aggregates */ + if ((patype == PARTITIONWISE_AGGREGATE_PARTIAL) && havingQual) + return false; + /* Get the fpinfo of the underlying scan relation. */ ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; @@ -6619,6 +6627,7 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, /* Ignore stages we don't support; and skip any duplicate calls. */ if ((stage != UPPERREL_GROUP_AGG && + stage != UPPERREL_PARTIAL_GROUP_AGG && stage != UPPERREL_ORDERED && stage != UPPERREL_FINAL) || output_rel->fdw_private) @@ -6635,6 +6644,10 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, add_foreign_grouping_paths(root, input_rel, output_rel, (GroupPathExtraData *) extra); break; + case UPPERREL_PARTIAL_GROUP_AGG: + add_foreign_grouping_paths(root, input_rel, output_rel, + (GroupPathExtraData*)extra); + break; case UPPERREL_ORDERED: add_foreign_ordered_paths(root, input_rel, output_rel); break; @@ -6675,7 +6688,8 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, return; Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || - extra->patype == PARTITIONWISE_AGGREGATE_FULL); + extra->patype == PARTITIONWISE_AGGREGATE_FULL || + extra->patype == PARTITIONWISE_AGGREGATE_PARTIAL); /* save the input_rel as outerrel in fpinfo */ fpinfo->outerrel = input_rel; @@ -6695,7 +6709,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, * Use HAVING qual from extra. In case of child partition, it will have * translated Vars. */ - if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual)) + if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual, extra->patype)) return; /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 02c1152319..616f559080 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -88,6 +88,9 @@ typedef struct PgFdwRelationInfo int fetch_size; /* fetch size for this remote table */ + /* Options for checking compatibility of partial aggregation */ + int server_version; + /* * Name of the relation, for use while EXPLAINing ForeignScan. It is used * for join and upper relations but is set for all relations. For a base diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 5bd69339df..dcbf60d919 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2944,16 +2944,34 @@ RESET enable_partitionwise_join; -- =================================================================== -- test partitionwise aggregates -- =================================================================== +CREATE OR REPLACE FUNCTION f_server_version_lag(version_offset int4) RETURNS text AS $$ + SELECT (setting::int4 + version_offset)::text FROM pg_settings WHERE name = 'server_version_num'; +$$ LANGUAGE sql; -CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a); +CREATE OR REPLACE FUNCTION f_alter_server_version(server_name text, operation text, version_offset int4) RETURNS void AS $$ + DECLARE + version_num text; + BEGIN + EXECUTE 'SELECT f_server_version_lag($1) ' + INTO version_num + USING version_offset; + EXECUTE format('ALTER SERVER %I OPTIONS (%I server_version %L)', + server_name, operation, version_num); + RETURN; + END; +$$ LANGUAGE plpgsql; + +SELECT f_alter_server_version('loopback', 'add', 0); + +CREATE TABLE pagg_tab (a int, b int, c text, d int4) PARTITION BY RANGE(a); CREATE TABLE pagg_tab_p1 (LIKE pagg_tab); CREATE TABLE pagg_tab_p2 (LIKE pagg_tab); CREATE TABLE pagg_tab_p3 (LIKE pagg_tab); -INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10; -INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10; -INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20; +INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10; +INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10; +INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20; -- Create foreign partitions CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1'); @@ -2987,6 +3005,65 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1; EXPLAIN (COSTS OFF) SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1; +-- It's unsafe to push down having clause when there are partial aggregates +EXPLAIN (VERBOSE, COSTS OFF) +SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1; + +-- Partial aggregates are fine to push down without having clause +EXPLAIN (VERBOSE, COSTS OFF) +SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1; +SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1; + +-- Partial aggregates are fine to push down +EXPLAIN (VERBOSE, COSTS OFF) +SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab; +SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab; + +-- It's unsafe to push down partial aggregates which contains distinct clause +EXPLAIN (VERBOSE, COSTS OFF) +SELECT max(a), count(distinct b) FROM pagg_tab; +SELECT max(a), count(distinct b) FROM pagg_tab; + +-- It's unsafe to push down partial aggregates when +-- server_version is older than partialagg_minversion, described by pg_aggregate +SELECT f_alter_server_version('loopback', 'set', -1); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT avg(d) FROM pagg_tab; +SELECT avg(d) FROM pagg_tab; + +-- It's safe to push down partial aggregate for user defined aggregate +-- which has valid options. +CREATE AGGREGATE postgres_fdw_sum_p_int8(int8) ( + SFUNC = int8_avg_accum, + STYPE = internal, + FINALFUNC = int8_avg_serialize +); + +CREATE AGGREGATE postgres_fdw_sum(int8) ( + SFUNC = int8_avg_accum, + STYPE = internal, + COMBINEFUNC = int8_avg_combine, + FINALFUNC = numeric_poly_sum, + SERIALFUNC = int8_avg_serialize, + DESERIALFUNC = int8_avg_deserialize, + PARTIALAGGFUNC = postgres_fdw_sum_p_int8, + PARTIALAGG_MINVERSION = 150000 +); + +ALTER EXTENSION postgres_fdw ADD FUNCTION postgres_fdw_sum(int8); + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT postgres_fdw_sum(d::int8) FROM pagg_tab; +SELECT postgres_fdw_sum(d::int8) FROM pagg_tab; + +ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_sum(int8); +DROP AGGREGATE postgres_fdw_sum(int8); +DROP AGGREGATE postgres_fdw_sum_p_int8(int8); + +DROP FUNCTION f_server_version_lag(int4); +DROP FUNCTION f_alter_server_version(text, text, int4); +ALTER SERVER loopback OPTIONS (DROP server_version); + -- =================================================================== -- access rights and superuser -- =================================================================== diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index ebc4454743..037eb06f4d 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -63,6 +63,7 @@ AggregateCreate(const char *aggName, List *aggmtransfnName, List *aggminvtransfnName, List *aggmfinalfnName, + List *partialaggfnName, bool finalfnExtraArgs, bool mfinalfnExtraArgs, char finalfnModify, @@ -72,6 +73,7 @@ AggregateCreate(const char *aggName, int32 aggTransSpace, Oid aggmTransType, int32 aggmTransSpace, + int32 partialaggMinversion, const char *agginitval, const char *aggminitval, char proparallel) @@ -91,6 +93,7 @@ AggregateCreate(const char *aggName, Oid mtransfn = InvalidOid; /* can be omitted */ Oid minvtransfn = InvalidOid; /* can be omitted */ Oid mfinalfn = InvalidOid; /* can be omitted */ + Oid partialaggfn = InvalidOid; /* can be omitted */ Oid sortop = InvalidOid; /* can be omitted */ Oid *aggArgTypes = parameterTypes->values; bool mtransIsStrict = false; @@ -569,6 +572,33 @@ AggregateCreate(const char *aggName, format_type_be(finaltype)))); } + /* + * Validate the partial aggregate function, if present. + */ + if (partialaggfnName) + { + HeapTuple aggtup; + partialaggfn = LookupFuncName(partialaggfnName, numArgs, aggArgTypes, false); + + /* Check aggregate creator has permission to call the function */ + aclresult = object_aclcheck(ProcedureRelationId, partialaggfn, GetUserId(), ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(partialaggfn)); + + rettype = get_func_rettype(partialaggfn); + + if (((aggTransType == INTERNALOID) && (rettype != BYTEAOID)) + || ((aggTransType != INTERNALOID) && (rettype != aggTransType))) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of partialaggfunc of %s is not stype", + NameListToString(partialaggfnName)))); + aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(partialaggfn)); + if (!HeapTupleIsValid(aggtup)) + elog(ERROR, "cache lookup failed for partialaggfunc %u", partialaggfn); + ReleaseSysCache(aggtup); + } + /* handle sortop, if supplied */ if (aggsortopName) { @@ -684,6 +714,8 @@ AggregateCreate(const char *aggName, values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval); else nulls[Anum_pg_aggregate_aggminitval - 1] = true; + values[Anum_pg_aggregate_partialaggfn - 1] = ObjectIdGetDatum(partialaggfn); + values[Anum_pg_aggregate_partialagg_minversion - 1] = Int32GetDatum(partialaggMinversion); if (replace) oldtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(procOid)); diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index fda9d1aa77..cb49a79cfa 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -70,6 +70,7 @@ DefineAggregate(ParseState *pstate, List *combinefuncName = NIL; List *serialfuncName = NIL; List *deserialfuncName = NIL; + List *partialaggfuncName = NIL; List *mtransfuncName = NIL; List *minvtransfuncName = NIL; List *mfinalfuncName = NIL; @@ -83,6 +84,7 @@ DefineAggregate(ParseState *pstate, TypeName *mtransType = NULL; int32 transSpace = 0; int32 mtransSpace = 0; + int32 partialaggMinversion = 0; char *initval = NULL; char *minitval = NULL; char *parallel = NULL; @@ -143,6 +145,8 @@ DefineAggregate(ParseState *pstate, serialfuncName = defGetQualifiedName(defel); else if (strcmp(defel->defname, "deserialfunc") == 0) deserialfuncName = defGetQualifiedName(defel); + else if (strcmp(defel->defname, "partialaggfunc") == 0) + partialaggfuncName = defGetQualifiedName(defel); else if (strcmp(defel->defname, "msfunc") == 0) mtransfuncName = defGetQualifiedName(defel); else if (strcmp(defel->defname, "minvfunc") == 0) @@ -182,6 +186,8 @@ DefineAggregate(ParseState *pstate, mtransType = defGetTypeName(defel); else if (strcmp(defel->defname, "msspace") == 0) mtransSpace = defGetInt32(defel); + else if (strcmp(defel->defname, "partialagg_minversion") == 0) + partialaggMinversion = defGetInt32(defel); else if (strcmp(defel->defname, "initcond") == 0) initval = defGetString(defel); else if (strcmp(defel->defname, "initcond1") == 0) @@ -461,6 +467,7 @@ DefineAggregate(ParseState *pstate, mtransfuncName, /* fwd trans function name */ minvtransfuncName, /* inv trans function name */ mfinalfuncName, /* final function name */ + partialaggfuncName, finalfuncExtraArgs, mfinalfuncExtraArgs, finalfuncModify, @@ -470,6 +477,7 @@ DefineAggregate(ParseState *pstate, transSpace, /* transition space */ mtransTypeId, /* transition data type */ mtransSpace, /* transition space */ + partialaggMinversion, initval, /* initial condition */ minitval, /* initial condition */ proparallel); /* parallel safe? */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 7a504dfe25..80165ada03 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -13693,6 +13693,8 @@ dumpAgg(Archive *fout, const AggInfo *agginfo) const char *aggmtransfn; const char *aggminvtransfn; const char *aggmfinalfn; + const char *partialaggfn; + const char *partialagg_minversion; bool aggfinalextra; bool aggmfinalextra; char aggfinalmodify; @@ -13775,11 +13777,19 @@ dumpAgg(Archive *fout, const AggInfo *agginfo) if (fout->remoteVersion >= 110000) appendPQExpBufferStr(query, "aggfinalmodify,\n" - "aggmfinalmodify\n"); + "aggmfinalmodify,\n"); else appendPQExpBufferStr(query, "'0' AS aggfinalmodify,\n" - "'0' AS aggmfinalmodify\n"); + "'0' AS aggmfinalmodify,\n"); + if (fout->remoteVersion >= 160000) + appendPQExpBufferStr(query, + "partialaggfn,\n" + "partialagg_minversion\n"); + else + appendPQExpBufferStr(query, + "'-' AS partialaggfn,\n" + "0 AS partialagg_minversion\n"); appendPQExpBufferStr(query, "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p " @@ -13805,6 +13815,8 @@ dumpAgg(Archive *fout, const AggInfo *agginfo) aggcombinefn = PQgetvalue(res, 0, PQfnumber(res, "aggcombinefn")); aggserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggserialfn")); aggdeserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggdeserialfn")); + partialaggfn = PQgetvalue(res, 0, PQfnumber(res, "partialaggfn")); + partialagg_minversion = PQgetvalue(res, 0, PQfnumber(res, "partialagg_minversion")); aggmtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggmtransfn")); aggminvtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggminvtransfn")); aggmfinalfn = PQgetvalue(res, 0, PQfnumber(res, "aggmfinalfn")); @@ -13894,6 +13906,15 @@ dumpAgg(Archive *fout, const AggInfo *agginfo) if (strcmp(aggdeserialfn, "-") != 0) appendPQExpBuffer(details, ",\n DESERIALFUNC = %s", aggdeserialfn); + if (strcmp(partialaggfn, "-") != 0) + appendPQExpBuffer(details, ",\n PARTIALAGGFUNC = %s", partialaggfn); + + if (strcmp(partialagg_minversion, "0") != 0) + { + appendPQExpBuffer(details, ",\n PARTIALAGG_MINVERSION = %s", + partialagg_minversion); + } + if (strcmp(aggmtransfn, "-") != 0) { appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s", diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat index 283f494bf5..689ed3546a 100644 --- a/src/include/catalog/pg_aggregate.dat +++ b/src/include/catalog/pg_aggregate.dat @@ -13,22 +13,44 @@ [ # avg +{ aggfnoid => 'avg_p_int8', aggtransfn => 'int8_avg_accum', + aggfinalfn => 'int8_avg_serialize', aggcombinefn => 'int8_avg_combine', + aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize', + aggtranstype => 'internal', + aggtransspace => '48' }, { aggfnoid => 'avg(int8)', aggtransfn => 'int8_avg_accum', aggfinalfn => 'numeric_poly_avg', aggcombinefn => 'int8_avg_combine', aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize', aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv', aggmfinalfn => 'numeric_poly_avg', aggtranstype => 'internal', - aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' }, + aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48', + partialaggfn => 'avg_p_int8', partialagg_minversion => '160000' }, +{ aggfnoid => 'avg_p_int4', aggtransfn => 'int4_avg_accum', + aggcombinefn => 'int4_avg_combine', + aggtranstype => '_int8', + agginitval => '{0,0}' }, { aggfnoid => 'avg(int4)', aggtransfn => 'int4_avg_accum', aggfinalfn => 'int8_avg', aggcombinefn => 'int4_avg_combine', aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv', aggmfinalfn => 'int8_avg', aggtranstype => '_int8', aggmtranstype => '_int8', - agginitval => '{0,0}', aggminitval => '{0,0}' }, + agginitval => '{0,0}', aggminitval => '{0,0}', + partialaggfn => 'avg_p_int4', partialagg_minversion => '160000' }, +{ aggfnoid => 'avg_p_int2', aggtransfn => 'int2_avg_accum', + aggcombinefn => 'int4_avg_combine', + aggtranstype => '_int8', + agginitval => '{0,0}' }, { aggfnoid => 'avg(int2)', aggtransfn => 'int2_avg_accum', aggfinalfn => 'int8_avg', aggcombinefn => 'int4_avg_combine', aggmtransfn => 'int2_avg_accum', aggminvtransfn => 'int2_avg_accum_inv', aggmfinalfn => 'int8_avg', aggtranstype => '_int8', aggmtranstype => '_int8', - agginitval => '{0,0}', aggminitval => '{0,0}' }, + agginitval => '{0,0}', aggminitval => '{0,0}', + partialaggfn => 'avg_p_int2', partialagg_minversion => '160000' }, +{ aggfnoid => 'avg_p_numeric', aggtransfn => 'numeric_avg_accum', + aggfinalfn => 'numeric_avg_serialize', aggcombinefn => 'numeric_avg_combine', + aggserialfn => 'numeric_avg_serialize', + aggdeserialfn => 'numeric_avg_deserialize', + aggtranstype => 'internal', aggtransspace => '128', + partialaggfn => 'avg_p_numeric', partialagg_minversion => '160000' }, { aggfnoid => 'avg(numeric)', aggtransfn => 'numeric_avg_accum', aggfinalfn => 'numeric_avg', aggcombinefn => 'numeric_avg_combine', aggserialfn => 'numeric_avg_serialize', @@ -36,27 +58,45 @@ aggmtransfn => 'numeric_avg_accum', aggminvtransfn => 'numeric_accum_inv', aggmfinalfn => 'numeric_avg', aggtranstype => 'internal', aggtransspace => '128', aggmtranstype => 'internal', - aggmtransspace => '128' }, + aggmtransspace => '128', + partialaggfn => 'avg_p_numeric', partialagg_minversion => '160000' }, +{ aggfnoid => 'avg_p_float4', aggtransfn => 'float4_accum', + aggcombinefn => 'float8_combine', + aggtranstype => '_float8', agginitval => '{0,0,0}' }, { aggfnoid => 'avg(float4)', aggtransfn => 'float4_accum', aggfinalfn => 'float8_avg', aggcombinefn => 'float8_combine', + aggtranstype => '_float8', agginitval => '{0,0,0}', + partialaggfn => 'avg_p_float4', partialagg_minversion => '160000' }, +{ aggfnoid => 'avg_p_float8', aggtransfn => 'float8_accum', + aggcombinefn => 'float8_combine', aggtranstype => '_float8', agginitval => '{0,0,0}' }, { aggfnoid => 'avg(float8)', aggtransfn => 'float8_accum', aggfinalfn => 'float8_avg', aggcombinefn => 'float8_combine', - aggtranstype => '_float8', agginitval => '{0,0,0}' }, + aggtranstype => '_float8', agginitval => '{0,0,0}', + partialaggfn => 'avg_p_float8', partialagg_minversion => '160000' }, +{ aggfnoid => 'avg_p_interval', aggtransfn => 'interval_accum', + aggcombinefn => 'interval_combine', + aggtranstype => '_interval', agginitval => '{0 second,0 second}' }, { aggfnoid => 'avg(interval)', aggtransfn => 'interval_accum', aggfinalfn => 'interval_avg', aggcombinefn => 'interval_combine', aggmtransfn => 'interval_accum', aggminvtransfn => 'interval_accum_inv', aggmfinalfn => 'interval_avg', aggtranstype => '_interval', aggmtranstype => '_interval', agginitval => '{0 second,0 second}', - aggminitval => '{0 second,0 second}' }, + aggminitval => '{0 second,0 second}', + partialaggfn => 'avg_p_interval', partialagg_minversion => '160000' }, # sum +{ aggfnoid => 'sum_p_int8', aggtransfn => 'int8_avg_accum', + aggfinalfn => 'int8_avg_serialize', aggcombinefn => 'int8_avg_combine', + aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize', + aggtranstype => 'internal', aggtransspace => '48' }, { aggfnoid => 'sum(int8)', aggtransfn => 'int8_avg_accum', aggfinalfn => 'numeric_poly_sum', aggcombinefn => 'int8_avg_combine', aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize', aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv', aggmfinalfn => 'numeric_poly_sum', aggtranstype => 'internal', - aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' }, + aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48', + partialaggfn => 'sum_p_int8', partialagg_minversion => '160000' }, { aggfnoid => 'sum(int4)', aggtransfn => 'int4_sum', aggcombinefn => 'int8pl', aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv', aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8', @@ -76,6 +116,11 @@ aggcombinefn => 'interval_pl', aggmtransfn => 'interval_pl', aggminvtransfn => 'interval_mi', aggtranstype => 'interval', aggmtranstype => 'interval' }, +{ aggfnoid => 'sum_p_numeric', aggtransfn => 'numeric_avg_accum', + aggfinalfn => 'numeric_avg_serialize', aggcombinefn => 'numeric_avg_combine', + aggserialfn => 'numeric_avg_serialize', + aggdeserialfn => 'numeric_avg_deserialize', + aggtranstype => 'internal', aggtransspace => '128' }, { aggfnoid => 'sum(numeric)', aggtransfn => 'numeric_avg_accum', aggfinalfn => 'numeric_sum', aggcombinefn => 'numeric_avg_combine', aggserialfn => 'numeric_avg_serialize', @@ -83,7 +128,8 @@ aggmtransfn => 'numeric_avg_accum', aggminvtransfn => 'numeric_accum_inv', aggmfinalfn => 'numeric_sum', aggtranstype => 'internal', aggtransspace => '128', aggmtranstype => 'internal', - aggmtransspace => '128' }, + aggmtransspace => '128', + partialaggfn => 'sum_p_numeric', partialagg_minversion => '160000' }, # max { aggfnoid => 'max(int8)', aggtransfn => 'int8larger', diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 3112881193..1cefa1bfba 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -55,6 +55,12 @@ CATALOG(pg_aggregate,2600,AggregateRelationId) /* function to convert bytea to transtype (0 if none) */ regproc aggdeserialfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc); + /* special aggregate function for partial aggregation (0 if none) */ + regproc partialaggfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc); + + /* minimum PostgreSQL's version which has compatible partialaggfn */ + int32 partialagg_minversion BKI_DEFAULT(0); + /* forward function for moving-aggregate mode (0 if none) */ regproc aggmtransfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc); @@ -141,6 +147,9 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_aggregate_fnoid_index, 2650, AggregateFnoidIndexId, #define AGGMODIFY_SHAREABLE 's' #define AGGMODIFY_READ_WRITE 'w' +/* Symbolic value for default partialagg_minversion */ +#define PARTIALAGG_MINVERSION_DEFAULT 0 + #endif /* EXPOSE_TO_CLIENT_CODE */ @@ -164,6 +173,7 @@ extern ObjectAddress AggregateCreate(const char *aggName, List *aggmtransfnName, List *aggminvtransfnName, List *aggmfinalfnName, + List *aggpartialfnName, bool finalfnExtraArgs, bool mfinalfnExtraArgs, char finalfnModify, @@ -173,6 +183,7 @@ extern ObjectAddress AggregateCreate(const char *aggName, int32 aggTransSpace, Oid aggmTransType, int32 aggmTransSpace, + int32 partialaggMinversion, const char *agginitval, const char *aggminitval, char proparallel); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f9f2642201..2f27b54133 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6594,35 +6594,67 @@ descr => 'the average (arithmetic mean) as numeric of all bigint values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric', proargtypes => 'int8', prosrc => 'aggregate_dummy' }, +{ oid => '13500', + descr => 'the partial average (arithmetic mean) as numeric of all bigint values', + proname => 'avg_p_int8', prokind => 'a', proisstrict => 'f', prorettype => 'bytea', + proargtypes => 'int8', prosrc => 'aggregate_dummy' }, { oid => '2101', descr => 'the average (arithmetic mean) as numeric of all integer values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric', proargtypes => 'int4', prosrc => 'aggregate_dummy' }, +{ oid => '13501', + descr => 'the partial average (arithmetic mean) as numeric of all integer values', + proname => 'avg_p_int4', prokind => 'a', proisstrict => 'f', prorettype => '_int8', + proargtypes => 'int4', prosrc => 'aggregate_dummy' }, { oid => '2102', descr => 'the average (arithmetic mean) as numeric of all smallint values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric', proargtypes => 'int2', prosrc => 'aggregate_dummy' }, +{ oid => '13502', + descr => 'the partial average (arithmetic mean) as numeric of all smallint values', + proname => 'avg_p_int2', prokind => 'a', proisstrict => 'f', prorettype => '_int8', + proargtypes => 'int2', prosrc => 'aggregate_dummy' }, { oid => '2103', descr => 'the average (arithmetic mean) as numeric of all numeric values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric', proargtypes => 'numeric', prosrc => 'aggregate_dummy' }, +{ oid => '13503', + descr => 'the partial average (arithmetic mean) as numeric of all numeric values', + proname => 'avg_p_numeric', prokind => 'a', proisstrict => 'f', prorettype => 'bytea', + proargtypes => 'numeric', prosrc => 'aggregate_dummy' }, { oid => '2104', descr => 'the average (arithmetic mean) as float8 of all float4 values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'float8', proargtypes => 'float4', prosrc => 'aggregate_dummy' }, +{ oid => '13504', + descr => 'the partial average (arithmetic mean) as float8 of all float4 values', + proname => 'avg_p_float4', prokind => 'a', proisstrict => 'f', prorettype => '_float8', + proargtypes => 'float4', prosrc => 'aggregate_dummy' }, { oid => '2105', descr => 'the average (arithmetic mean) as float8 of all float8 values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'float8', proargtypes => 'float8', prosrc => 'aggregate_dummy' }, +{ oid => '13505', + descr => 'the partial average (arithmetic mean) as float8 of all float8 values', + proname => 'avg_p_float8', prokind => 'a', proisstrict => 'f', prorettype => '_float8', + proargtypes => 'float8', prosrc => 'aggregate_dummy' }, { oid => '2106', descr => 'the average (arithmetic mean) as interval of all interval values', proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'interval', proargtypes => 'interval', prosrc => 'aggregate_dummy' }, +{ oid => '13506', + descr => 'the partial average (arithmetic mean) as interval of all interval values', + proname => 'avg_p_interval', prokind => 'a', proisstrict => 'f', + prorettype => '_interval', proargtypes => 'interval', + prosrc => 'aggregate_dummy' }, { oid => '2107', descr => 'sum as numeric across all bigint input values', proname => 'sum', prokind => 'a', proisstrict => 'f', prorettype => 'numeric', proargtypes => 'int8', prosrc => 'aggregate_dummy' }, +{ oid => '13507', descr => 'partial sum as numeric across all bigint input values', + proname => 'sum_p_int8', prokind => 'a', proisstrict => 'f', prorettype => 'bytea', + proargtypes => 'int8', prosrc => 'aggregate_dummy' }, { oid => '2108', descr => 'sum as bigint across all integer input values', proname => 'sum', prokind => 'a', proisstrict => 'f', prorettype => 'int8', proargtypes => 'int4', prosrc => 'aggregate_dummy' }, @@ -6645,6 +6677,9 @@ { oid => '2114', descr => 'sum as numeric across all numeric input values', proname => 'sum', prokind => 'a', proisstrict => 'f', prorettype => 'numeric', proargtypes => 'numeric', prosrc => 'aggregate_dummy' }, +{ oid => '13508', descr => 'partial sum as numeric across all numeric input values', + proname => 'sum_p_numeric', prokind => 'a', proisstrict => 'f', prorettype => 'bytea', + proargtypes => 'numeric', prosrc => 'aggregate_dummy' }, { oid => '2115', descr => 'maximum value of all bigint input values', proname => 'max', prokind => 'a', proisstrict => 'f', prorettype => 'int8', diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out index dcf6909423..3197c8abba 100644 --- a/src/test/regress/expected/create_aggregate.out +++ b/src/test/regress/expected/create_aggregate.out @@ -322,3 +322,27 @@ WARNING: aggregate attribute "Finalfunc_extra" not recognized WARNING: aggregate attribute "Finalfunc_modify" not recognized WARNING: aggregate attribute "Parallel" not recognized ERROR: aggregate stype must be specified +-- invalid: partialaggfunc which doesn't exist +CREATE AGGREGATE haspartialagg_agg(int8) ( + sfunc = int8_avg_accum, + stype = internal, + combinefunc = int8_avg_combine, + finalfunc = numeric_poly_sum, + serialfunc = int8_avg_serialize, + deserialfunc = int8_avg_deserialize, + partialaggfunc = partialagg_foo, + partialagg_minversion = 160000 +); +ERROR: function partialagg_foo(bigint) does not exist +-- invalid: partialagg_minversion is not integer +CREATE AGGREGATE haspartialagg_agg(int8) ( + sfunc = int8_avg_accum, + stype = internal, + combinefunc = int8_avg_combine, + finalfunc = numeric_poly_sum, + serialfunc = int8_avg_serialize, + deserialfunc = int8_avg_deserialize, + partialaggfunc = partialagg_foo, + partialagg_minversion = aaa +); +ERROR: partialagg_minversion requires an integer value diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be..49f2c9bc35 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -145,6 +145,7 @@ NOTICE: checking pg_aggregate {aggfinalfn} => pg_proc {oid} NOTICE: checking pg_aggregate {aggcombinefn} => pg_proc {oid} NOTICE: checking pg_aggregate {aggserialfn} => pg_proc {oid} NOTICE: checking pg_aggregate {aggdeserialfn} => pg_proc {oid} +NOTICE: checking pg_aggregate {partialaggfn} => pg_proc {oid} NOTICE: checking pg_aggregate {aggmtransfn} => pg_proc {oid} NOTICE: checking pg_aggregate {aggminvtransfn} => pg_proc {oid} NOTICE: checking pg_aggregate {aggmfinalfn} => pg_proc {oid} diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql index d4b4036fd7..499c120ca5 100644 --- a/src/test/regress/sql/create_aggregate.sql +++ b/src/test/regress/sql/create_aggregate.sql @@ -328,3 +328,27 @@ CREATE AGGREGATE case_agg(float8) "Finalfunc_modify" = read_write, "Parallel" = safe ); + +-- invalid: partialaggfunc which doesn't exist +CREATE AGGREGATE haspartialagg_agg(int8) ( + sfunc = int8_avg_accum, + stype = internal, + combinefunc = int8_avg_combine, + finalfunc = numeric_poly_sum, + serialfunc = int8_avg_serialize, + deserialfunc = int8_avg_deserialize, + partialaggfunc = partialagg_foo, + partialagg_minversion = 160000 +); + +-- invalid: partialagg_minversion is not integer +CREATE AGGREGATE haspartialagg_agg(int8) ( + sfunc = int8_avg_accum, + stype = internal, + combinefunc = int8_avg_combine, + finalfunc = numeric_poly_sum, + serialfunc = int8_avg_serialize, + deserialfunc = int8_avg_deserialize, + partialaggfunc = partialagg_foo, + partialagg_minversion = aaa +); \ No newline at end of file