On Fri, Mar 31, 2023 at 05:49:21AM +0000,
[email protected] wrote:
> Hi Mr.Momjian
>
> > First, am I correct?
> Yes, you are correct. This patch uses new special aggregate functions for
> partial aggregate
> (then we call this partialaggfunc).
First, my apologies for not addressing this sooner. I was so focused on
my own tasks that I didn't realize this very important patch was not
getting attention. I will try my best to get it into PG 17.
What amazes me is that you didn't need to create _any_ actual aggregate
functions. Rather, you just needed to hook existing functions into the
aggregate tables for partial FDW execution.
> > Second, how far away is this from being committable
> > and/or what work needs to be done to get it committable, either for PG 16
> > or 17?
> I believe there are three: 1. and 2. are not clear if they are necessary or
> not; 3. are clearly necessary.
> I would like to hear the opinions of the development community on whether or
> not 1. and 2. need to be addressed.
>
> 1. Making partialaggfunc user-defined function
> In v17, I make partialaggfuncs as built-in functions.
> Because of this approach, v17 changes specification of BKI file and
> pg_aggregate.
> For now, partialaggfuncs are needed by only postgres_fdw which is just an
> extension of PostgreSQL.
> In the future, when revising the specifications for BKI files and
> pg_aggregate when modifying existing PostgreSQL functions,
> It is necessary to align them with this patch's changes.
> I am concerned that this may be undesirable.
> So I am thinking that v17 should be modified to making partialaggfunc as user
> defined function.
I think we have three possible cases for aggregates pushdown to FDWs:
1) Postgres built-in aggregate functions
2) Postgres user-defined & extension aggregate functions
3) aggregate functions calls to non-PG FDWs
Your patch handles #1 by checking that the FDW Postgres version is the
same as the calling Postgres version. However, it doesn't check for
extension versions, and frankly, I don't see how we could implement that
cleanly without significant complexity.
I suggest we remove the version check requirement --- instead just
document that the FDW Postgres version should be the same or newer than
the calling Postgres server --- that way, we can assume that whatever is
in the system catalogs of the caller is in the receiving side. We
should add a GUC to turn off this optimization for cases where the FDW
Postgres version is older than the caller. This handles case 1-2.
For case 3, I don't even know how much pushdown those do of _any_
aggregates to non-PG servers, let along parallel FDW ones. Does anyone
know the details?
> 2. Automation of creating definition of partialaggfuncs
> In development of v17, I manually create definition of partialaggfuncs for
> avg, min, max, sum, count.
> I am concerned that this may be undesirable.
> So I am thinking that v17 should be modified to automate creating definition
> of partialaggfuncs
> for all built-in aggregate functions.
Are there any other builtin functions that need this? I think we can
just provide documention for extensions on how to do this.
> 3. Documentation
> I need add explanation of partialaggfunc to documents on postgres_fdw and
> other places.
I can help with that once we decide on the above.
I think 'partialaggfn' should be named 'aggpartialfn' to match other
columns in pg_aggregate.
I am confused by these changes to pg_aggegate:
+{ aggfnoid => 'sum_p_int8', aggtransfn => 'int8_avg_accum',
+ aggfinalfn => 'int8_avg_serialize', aggcombinefn => 'int8_avg_combine',
+ aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
+ aggtranstype => 'internal', aggtransspace => '48' },
...
+{ aggfnoid => 'sum_p_numeric', aggtransfn => 'numeric_avg_accum',
+ aggfinalfn => 'numeric_avg_serialize', aggcombinefn => 'numeric_avg_combine',
+ aggserialfn => 'numeric_avg_serialize',
+ aggdeserialfn => 'numeric_avg_deserialize',
+ aggtranstype => 'internal', aggtransspace => '128' },
Why are these marked as 'sum' but use 'avg' functions?
It would be good to explain exactly how this is diffent from background
worker parallelism.
--
Bruce Momjian <[email protected]> https://momjian.us
EDB https://enterprisedb.com
Embrace your flaws. They make you human, rather than perfect,
which you will never be.
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 09d6dd60dd..fe219b8d2f 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -202,7 +202,7 @@ static bool is_subquery_var(Var *node, RelOptInfo *foreignrel,
int *relno, int *colno);
static void get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
int *relno, int *colno);
-
+static bool partial_agg_ok(Aggref* agg, PgFdwRelationInfo* fpinfo);
/*
* Examine each qual clause in input_conds, and classify them into two groups,
@@ -907,8 +907,9 @@ foreign_expr_walker(Node *node,
if (!IS_UPPER_REL(glob_cxt->foreignrel))
return false;
- /* Only non-split aggregates are pushable. */
- if (agg->aggsplit != AGGSPLIT_SIMPLE)
+ if ((agg->aggsplit != AGGSPLIT_SIMPLE) && (agg->aggsplit != AGGSPLIT_INITIAL_SERIAL))
+ return false;
+ if (agg->aggsplit == AGGSPLIT_INITIAL_SERIAL && !partial_agg_ok(agg, fpinfo))
return false;
/* As usual, it must be shippable. */
@@ -3517,14 +3518,37 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
StringInfo buf = context->buf;
bool use_variadic;
- /* Only basic, non-split aggregation accepted. */
- Assert(node->aggsplit == AGGSPLIT_SIMPLE);
+
+ Assert((node->aggsplit == AGGSPLIT_SIMPLE) ||
+ (node->aggsplit == AGGSPLIT_INITIAL_SERIAL));
/* Check if need to print VARIADIC (cf. ruleutils.c) */
use_variadic = node->aggvariadic;
- /* Find aggregate name from aggfnoid which is a pg_proc entry */
- appendFunctionName(node->aggfnoid, context);
+ if (node->aggsplit == AGGSPLIT_SIMPLE) {
+ /* Find aggregate name from aggfnoid which is a pg_proc entry */
+ appendFunctionName(node->aggfnoid, context);
+ }
+ else {
+ HeapTuple aggtup;
+ Form_pg_aggregate aggform;
+
+ /* Find aggregate name from aggfnoid or partialaggfn which is a pg_proc entry */
+ aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(node->aggfnoid));
+ if (!HeapTupleIsValid(aggtup))
+ elog(ERROR, "cache lookup failed for aggregate %u", node->aggfnoid);
+ aggform = (Form_pg_aggregate)GETSTRUCT(aggtup);
+
+ if ((aggform->aggtranstype != INTERNALOID) && (aggform->aggfinalfn == InvalidOid)) {
+ appendFunctionName(node->aggfnoid, context);
+ } else if(aggform->partialaggfn) {
+ appendFunctionName(aggform->partialaggfn, context);
+ } else {
+ elog(ERROR, "there is no partialaggfn %u", node->aggfnoid);
+ }
+ ReleaseSysCache(aggtup);
+ }
+
appendStringInfoChar(buf, '(');
/* Add DISTINCT */
@@ -4047,3 +4071,62 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
/* Shouldn't get here */
elog(ERROR, "unexpected expression in subquery output");
}
+
+/*
+ * Check that partial aggregate function, described by aggform,
+ * exists on remote server, described by fpinfo.
+ */
+static bool
+partial_agg_compatible(Form_pg_aggregate aggform, PgFdwRelationInfo *fpinfo)
+{
+ int32 partialagg_minversion = PG_VERSION_NUM;
+ if (aggform->partialagg_minversion != PARTIALAGG_MINVERSION_DEFAULT) {
+ partialagg_minversion = aggform->partialagg_minversion;
+ }
+ return (fpinfo->server_version >= partialagg_minversion);
+}
+
+/*
+ * Check that partial aggregate agg is fine to push down.
+ *
+ * It is fine when all of the following conditions are true.
+ * condition1) agg is AGGKIND_NORMAL aggregate which contains no distinct
+ * or order by clauses
+ * condition2) there is an aggregate function for partial aggregation
+ * (then we call this partialaggfunc) corresponding to agg.
+ * condition2 is true when either of the following cases is true.
+ * case2-1) return type of agg is not internal and agg has no finalfunc.
+ * In this case, partialaggfunc is agg itself.
+ * case2-2) agg has valid partialaggfn and partialagg_minversion <= server_version.
+ * In this case, partialaggfunc is a aggregate function whose oid is partialaggfn.
+ */
+static bool
+partial_agg_ok(Aggref* agg, PgFdwRelationInfo *fpinfo)
+{
+ HeapTuple aggtup;
+ Form_pg_aggregate aggform;
+ bool partial_agg_ok = true;
+
+ Assert(agg->aggsplit == AGGSPLIT_INITIAL_SERIAL);
+
+ /* We don't support complex partial aggregates */
+ if (agg->aggdistinct || agg->aggkind != AGGKIND_NORMAL || agg->aggorder != NIL)
+ return false;
+
+ aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(agg->aggfnoid));
+ if (!HeapTupleIsValid(aggtup))
+ elog(ERROR, "cache lookup failed for aggregate %u", agg->aggfnoid);
+ aggform = (Form_pg_aggregate)GETSTRUCT(aggtup);
+
+ if ((aggform->aggtranstype == INTERNALOID) || (aggform->aggfinalfn != InvalidOid)) {
+ /* Only aggregates which has partialaggfn, are allowed */
+ if (!aggform->partialaggfn) {
+ partial_agg_ok = false;
+ } else if (!partial_agg_compatible(aggform, fpinfo)) {
+ partial_agg_ok = false;
+ }
+ }
+
+ ReleaseSysCache(aggtup);
+ return partial_agg_ok;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 8f6a04f71b..ebdb7cff3b 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9677,13 +9677,34 @@ RESET enable_partitionwise_join;
-- ===================================================================
-- test partitionwise aggregates
-- ===================================================================
-CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a);
+CREATE OR REPLACE FUNCTION f_server_version_lag(version_offset int4) RETURNS text AS $$
+ SELECT (setting::int4 + version_offset)::text FROM pg_settings WHERE name = 'server_version_num';
+$$ LANGUAGE sql;
+CREATE OR REPLACE FUNCTION f_alter_server_version(server_name text, operation text, version_offset int4) RETURNS void AS $$
+ DECLARE
+ version_num text;
+ BEGIN
+ EXECUTE 'SELECT f_server_version_lag($1) '
+ INTO version_num
+ USING version_offset;
+ EXECUTE format('ALTER SERVER %I OPTIONS (%I server_version %L)',
+ server_name, operation, version_num);
+ RETURN;
+ END;
+$$ LANGUAGE plpgsql;
+SELECT f_alter_server_version('loopback', 'add', 0);
+ f_alter_server_version
+------------------------
+
+(1 row)
+
+CREATE TABLE pagg_tab (a int, b int, c text, d int4) PARTITION BY RANGE(a);
CREATE TABLE pagg_tab_p1 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p2 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p3 (LIKE pagg_tab);
-INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
-INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
-INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
+INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
+INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
+INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
-- Create foreign partitions
CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1');
CREATE FOREIGN TABLE fpagg_tab_p2 PARTITION OF pagg_tab FOR VALUES FROM (10) TO (20) SERVER loopback OPTIONS (table_name 'pagg_tab_p2');
@@ -9742,8 +9763,8 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
-- Should have all the columns in the target list for the given relation
EXPLAIN (VERBOSE, COSTS OFF)
SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
- QUERY PLAN
-------------------------------------------------------------------------
+ QUERY PLAN
+---------------------------------------------------------------------------
Sort
Output: t1.a, (count(((t1.*)::pagg_tab)))
Sort Key: t1.a
@@ -9754,21 +9775,21 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
Filter: (avg(t1.b) < '22'::numeric)
-> Foreign Scan on public.fpagg_tab_p1 t1
Output: t1.a, t1.*, t1.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p1
+ Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p1
-> HashAggregate
Output: t1_1.a, count(((t1_1.*)::pagg_tab))
Group Key: t1_1.a
Filter: (avg(t1_1.b) < '22'::numeric)
-> Foreign Scan on public.fpagg_tab_p2 t1_1
Output: t1_1.a, t1_1.*, t1_1.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p2
+ Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p2
-> HashAggregate
Output: t1_2.a, count(((t1_2.*)::pagg_tab))
Group Key: t1_2.a
Filter: (avg(t1_2.b) < '22'::numeric)
-> Foreign Scan on public.fpagg_tab_p3 t1_2
Output: t1_2.a, t1_2.*, t1_2.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p3
+ Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p3
(25 rows)
SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
@@ -9804,6 +9825,263 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700
-> Foreign Scan on fpagg_tab_p3 pagg_tab_2
(15 rows)
+-- It's unsafe to push down having clause when there are partial aggregates
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------
+ Sort
+ Output: pagg_tab.b, (max(pagg_tab.a)), (count(*))
+ Sort Key: pagg_tab.b
+ -> Finalize HashAggregate
+ Output: pagg_tab.b, max(pagg_tab.a), count(*)
+ Group Key: pagg_tab.b
+ Filter: (sum(pagg_tab.a) < 700)
+ -> Append
+ -> Partial HashAggregate
+ Output: pagg_tab.b, PARTIAL max(pagg_tab.a), PARTIAL count(*), PARTIAL sum(pagg_tab.a)
+ Group Key: pagg_tab.b
+ -> Foreign Scan on public.fpagg_tab_p1 pagg_tab
+ Output: pagg_tab.b, pagg_tab.a
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p1
+ -> Partial HashAggregate
+ Output: pagg_tab_1.b, PARTIAL max(pagg_tab_1.a), PARTIAL count(*), PARTIAL sum(pagg_tab_1.a)
+ Group Key: pagg_tab_1.b
+ -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_1
+ Output: pagg_tab_1.b, pagg_tab_1.a
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p2
+ -> Partial HashAggregate
+ Output: pagg_tab_2.b, PARTIAL max(pagg_tab_2.a), PARTIAL count(*), PARTIAL sum(pagg_tab_2.a)
+ Group Key: pagg_tab_2.b
+ -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_2
+ Output: pagg_tab_2.b, pagg_tab_2.a
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p3
+(26 rows)
+
+-- Partial aggregates are fine to push down without having clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Sort
+ Output: pagg_tab.b, (min(pagg_tab.a)), (max(pagg_tab.a)), (count(*)), (sum(pagg_tab.d)), (sum((pagg_tab.d)::bigint)), (avg(pagg_tab.d)), (avg((pagg_tab.d)::bigint))
+ Sort Key: pagg_tab.b
+ -> Finalize HashAggregate
+ Output: pagg_tab.b, min(pagg_tab.a), max(pagg_tab.a), count(*), sum(pagg_tab.d), sum((pagg_tab.d)::bigint), avg(pagg_tab.d), avg((pagg_tab.d)::bigint)
+ Group Key: pagg_tab.b
+ -> Append
+ -> Foreign Scan
+ Output: pagg_tab.b, (PARTIAL min(pagg_tab.a)), (PARTIAL max(pagg_tab.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab.d)), (PARTIAL sum((pagg_tab.d)::bigint)), (PARTIAL avg(pagg_tab.d)), (PARTIAL avg((pagg_tab.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+ Remote SQL: SELECT b, min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p1 GROUP BY 1
+ -> Foreign Scan
+ Output: pagg_tab_1.b, (PARTIAL min(pagg_tab_1.a)), (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_1.d)), (PARTIAL sum((pagg_tab_1.d)::bigint)), (PARTIAL avg(pagg_tab_1.d)), (PARTIAL avg((pagg_tab_1.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+ Remote SQL: SELECT b, min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p2 GROUP BY 1
+ -> Foreign Scan
+ Output: pagg_tab_2.b, (PARTIAL min(pagg_tab_2.a)), (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_2.d)), (PARTIAL sum((pagg_tab_2.d)::bigint)), (PARTIAL avg(pagg_tab_2.d)), (PARTIAL avg((pagg_tab_2.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+ Remote SQL: SELECT b, min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p3 GROUP BY 1
+(19 rows)
+
+SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1;
+ b | min | max | count | sum | sum | avg | avg
+----+-----+-----+-------+------+------+---------------------+---------------------
+ 0 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000
+ 1 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000
+ 2 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000
+ 3 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000
+ 4 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000
+ 5 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000
+ 6 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000
+ 7 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000
+ 8 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000
+ 9 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000
+ 10 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000
+ 11 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000
+ 12 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000
+ 13 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000
+ 14 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000
+ 15 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000
+ 16 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000
+ 17 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000
+ 18 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000
+ 19 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000
+ 20 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000
+ 21 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000
+ 22 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000
+ 23 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000
+ 24 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000
+ 25 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000
+ 26 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000
+ 27 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000
+ 28 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000
+ 29 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000
+ 30 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000
+ 31 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000
+ 32 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000
+ 33 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000
+ 34 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000
+ 35 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000
+ 36 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000
+ 37 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000
+ 38 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000
+ 39 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000
+ 40 | 0 | 20 | 60 | 900 | 900 | 15.0000000000000000 | 15.0000000000000000
+ 41 | 1 | 21 | 60 | 960 | 960 | 16.0000000000000000 | 16.0000000000000000
+ 42 | 2 | 22 | 60 | 1020 | 1020 | 17.0000000000000000 | 17.0000000000000000
+ 43 | 3 | 23 | 60 | 1080 | 1080 | 18.0000000000000000 | 18.0000000000000000
+ 44 | 4 | 24 | 60 | 1140 | 1140 | 19.0000000000000000 | 19.0000000000000000
+ 45 | 5 | 25 | 60 | 1200 | 1200 | 20.0000000000000000 | 20.0000000000000000
+ 46 | 6 | 26 | 60 | 1260 | 1260 | 21.0000000000000000 | 21.0000000000000000
+ 47 | 7 | 27 | 60 | 1320 | 1320 | 22.0000000000000000 | 22.0000000000000000
+ 48 | 8 | 28 | 60 | 1380 | 1380 | 23.0000000000000000 | 23.0000000000000000
+ 49 | 9 | 29 | 60 | 1440 | 1440 | 24.0000000000000000 | 24.0000000000000000
+(50 rows)
+
+-- Partial aggregates are fine to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab;
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Finalize Aggregate
+ Output: min(pagg_tab.a), max(pagg_tab.a), count(*), sum(pagg_tab.d), sum((pagg_tab.d)::bigint), avg(pagg_tab.d), avg((pagg_tab.d)::bigint)
+ -> Append
+ -> Foreign Scan
+ Output: (PARTIAL min(pagg_tab.a)), (PARTIAL max(pagg_tab.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab.d)), (PARTIAL sum((pagg_tab.d)::bigint)), (PARTIAL avg(pagg_tab.d)), (PARTIAL avg((pagg_tab.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+ Remote SQL: SELECT min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p1
+ -> Foreign Scan
+ Output: (PARTIAL min(pagg_tab_1.a)), (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_1.d)), (PARTIAL sum((pagg_tab_1.d)::bigint)), (PARTIAL avg(pagg_tab_1.d)), (PARTIAL avg((pagg_tab_1.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+ Remote SQL: SELECT min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p2
+ -> Foreign Scan
+ Output: (PARTIAL min(pagg_tab_2.a)), (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_2.d)), (PARTIAL sum((pagg_tab_2.d)::bigint)), (PARTIAL avg(pagg_tab_2.d)), (PARTIAL avg((pagg_tab_2.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+ Remote SQL: SELECT min(a), max(a), count(*), sum(d), sum_p_int8(d::bigint), avg_p_int4(d), avg_p_int8(d::bigint) FROM public.pagg_tab_p3
+(15 rows)
+
+SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab;
+ min | max | count | sum | sum | avg | avg
+-----+-----+-------+-------+-------+---------------------+---------------------
+ 0 | 29 | 3000 | 58500 | 58500 | 19.5000000000000000 | 19.5000000000000000
+(1 row)
+
+-- It's unsafe to push down partial aggregates which contains distinct clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(distinct b) FROM pagg_tab;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------
+ Aggregate
+ Output: max(pagg_tab.a), count(DISTINCT pagg_tab.b)
+ -> Merge Append
+ Sort Key: pagg_tab.b
+ -> Foreign Scan on public.fpagg_tab_p1 pagg_tab_1
+ Output: pagg_tab_1.a, pagg_tab_1.b
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p1 ORDER BY b ASC NULLS LAST
+ -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_2
+ Output: pagg_tab_2.a, pagg_tab_2.b
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p2 ORDER BY b ASC NULLS LAST
+ -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_3
+ Output: pagg_tab_3.a, pagg_tab_3.b
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p3 ORDER BY b ASC NULLS LAST
+(13 rows)
+
+SELECT max(a), count(distinct b) FROM pagg_tab;
+ max | count
+-----+-------
+ 29 | 50
+(1 row)
+
+-- It's unsafe to push down partial aggregates when
+-- server_version is older than partialagg_minversion, described by pg_aggregate
+SELECT f_alter_server_version('loopback', 'set', -1);
+ f_alter_server_version
+------------------------
+
+(1 row)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT avg(d) FROM pagg_tab;
+ QUERY PLAN
+------------------------------------------------------------------
+ Finalize Aggregate
+ Output: avg(pagg_tab.d)
+ -> Append
+ -> Partial Aggregate
+ Output: PARTIAL avg(pagg_tab.d)
+ -> Foreign Scan on public.fpagg_tab_p1 pagg_tab
+ Output: pagg_tab.d
+ Remote SQL: SELECT d FROM public.pagg_tab_p1
+ -> Partial Aggregate
+ Output: PARTIAL avg(pagg_tab_1.d)
+ -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_1
+ Output: pagg_tab_1.d
+ Remote SQL: SELECT d FROM public.pagg_tab_p2
+ -> Partial Aggregate
+ Output: PARTIAL avg(pagg_tab_2.d)
+ -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_2
+ Output: pagg_tab_2.d
+ Remote SQL: SELECT d FROM public.pagg_tab_p3
+(18 rows)
+
+SELECT avg(d) FROM pagg_tab;
+ avg
+---------------------
+ 19.5000000000000000
+(1 row)
+
+-- It's safe to push down partial aggregate for user defined aggregate
+-- which has valid options.
+CREATE AGGREGATE postgres_fdw_sum_p_int8(int8) (
+ SFUNC = int8_avg_accum,
+ STYPE = internal,
+ FINALFUNC = int8_avg_serialize
+);
+CREATE AGGREGATE postgres_fdw_sum(int8) (
+ SFUNC = int8_avg_accum,
+ STYPE = internal,
+ COMBINEFUNC = int8_avg_combine,
+ FINALFUNC = numeric_poly_sum,
+ SERIALFUNC = int8_avg_serialize,
+ DESERIALFUNC = int8_avg_deserialize,
+ PARTIALAGGFUNC = postgres_fdw_sum_p_int8,
+ PARTIALAGG_MINVERSION = 150000
+);
+ALTER EXTENSION postgres_fdw ADD FUNCTION postgres_fdw_sum(int8);
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT postgres_fdw_sum(d::int8) FROM pagg_tab;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------
+ Finalize Aggregate
+ Output: postgres_fdw_sum((pagg_tab.d)::bigint)
+ -> Append
+ -> Foreign Scan
+ Output: (PARTIAL postgres_fdw_sum((pagg_tab.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+ Remote SQL: SELECT public.postgres_fdw_sum_p_int8(d::bigint) FROM public.pagg_tab_p1
+ -> Foreign Scan
+ Output: (PARTIAL postgres_fdw_sum((pagg_tab_1.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+ Remote SQL: SELECT public.postgres_fdw_sum_p_int8(d::bigint) FROM public.pagg_tab_p2
+ -> Foreign Scan
+ Output: (PARTIAL postgres_fdw_sum((pagg_tab_2.d)::bigint))
+ Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+ Remote SQL: SELECT public.postgres_fdw_sum_p_int8(d::bigint) FROM public.pagg_tab_p3
+(15 rows)
+
+SELECT postgres_fdw_sum(d::int8) FROM pagg_tab;
+ postgres_fdw_sum
+------------------
+ 58500
+(1 row)
+
+ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_sum(int8);
+DROP AGGREGATE postgres_fdw_sum(int8);
+DROP AGGREGATE postgres_fdw_sum_p_int8(int8);
+DROP FUNCTION f_server_version_lag(int4);
+DROP FUNCTION f_alter_server_version(text, text, int4);
+ALTER SERVER loopback OPTIONS (DROP server_version);
-- ===================================================================
-- access rights and superuser
-- ===================================================================
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 4229d2048c..75283fa2d8 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -211,6 +211,27 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
errmsg("sslcert and sslkey are superuser-only"),
errhint("User mappings with the sslcert or sslkey options set may only be created or modified by the superuser.")));
}
+ else if (strcmp(def->defname, "server_version") == 0)
+ {
+ char* value;
+ int int_val;
+ bool is_parsed;
+
+ value = defGetString(def);
+ is_parsed = parse_int(value, &int_val, 0, NULL);
+
+ if (!is_parsed)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for integer option \"%s\": %s",
+ def->defname, value)));
+
+ if (int_val <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("\"%s\" must be an integer value greater than zero",
+ def->defname)));
+ }
else if (strcmp(def->defname, "analyze_sampling") == 0)
{
char *value;
@@ -288,6 +309,9 @@ InitPgFdwOptions(void)
{"sslcert", UserMappingRelationId, true},
{"sslkey", UserMappingRelationId, true},
+ /* options for partial aggregation pushdown */
+ {"server_version", ForeignServerRelationId, false},
+
{NULL, InvalidOid, false}
};
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d728bd70b3..7b51acadf3 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -519,7 +519,7 @@ static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
JoinPathExtraData *extra);
static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
- Node *havingQual);
+ Node *havingQual, PartitionwiseAggregateType patype);
static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
RelOptInfo *rel);
static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
@@ -651,6 +651,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
fpinfo->shippable_extensions = NIL;
fpinfo->fetch_size = 100;
fpinfo->async_capable = false;
+ fpinfo->server_version = 0;
apply_server_options(fpinfo);
apply_table_options(fpinfo);
@@ -6130,6 +6131,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
+ else if (strcmp(def->defname, "server_version") == 0)
+ (void)parse_int(defGetString(def), &fpinfo->server_version, 0, NULL);
}
}
@@ -6188,6 +6191,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
fpinfo->async_capable = fpinfo_o->async_capable;
+ fpinfo->server_version = fpinfo_o->server_version;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
@@ -6367,7 +6371,7 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
*/
static bool
foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
- Node *havingQual)
+ Node *havingQual, PartitionwiseAggregateType patype)
{
Query *query = root->parse;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
@@ -6381,6 +6385,10 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
if (query->groupingSets)
return false;
+ /* It's unsafe to push having statements with partial aggregates */
+ if ((patype == PARTITIONWISE_AGGREGATE_PARTIAL) && havingQual)
+ return false;
+
/* Get the fpinfo of the underlying scan relation. */
ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
@@ -6619,6 +6627,7 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
/* Ignore stages we don't support; and skip any duplicate calls. */
if ((stage != UPPERREL_GROUP_AGG &&
+ stage != UPPERREL_PARTIAL_GROUP_AGG &&
stage != UPPERREL_ORDERED &&
stage != UPPERREL_FINAL) ||
output_rel->fdw_private)
@@ -6635,6 +6644,10 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
add_foreign_grouping_paths(root, input_rel, output_rel,
(GroupPathExtraData *) extra);
break;
+ case UPPERREL_PARTIAL_GROUP_AGG:
+ add_foreign_grouping_paths(root, input_rel, output_rel,
+ (GroupPathExtraData*)extra);
+ break;
case UPPERREL_ORDERED:
add_foreign_ordered_paths(root, input_rel, output_rel);
break;
@@ -6675,7 +6688,8 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
return;
Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
- extra->patype == PARTITIONWISE_AGGREGATE_FULL);
+ extra->patype == PARTITIONWISE_AGGREGATE_FULL ||
+ extra->patype == PARTITIONWISE_AGGREGATE_PARTIAL);
/* save the input_rel as outerrel in fpinfo */
fpinfo->outerrel = input_rel;
@@ -6695,7 +6709,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
* Use HAVING qual from extra. In case of child partition, it will have
* translated Vars.
*/
- if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
+ if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual, extra->patype))
return;
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 02c1152319..616f559080 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -88,6 +88,9 @@ typedef struct PgFdwRelationInfo
int fetch_size; /* fetch size for this remote table */
+ /* Options for checking compatibility of partial aggregation */
+ int server_version;
+
/*
* Name of the relation, for use while EXPLAINing ForeignScan. It is used
* for join and upper relations but is set for all relations. For a base
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 5bd69339df..dcbf60d919 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2944,16 +2944,34 @@ RESET enable_partitionwise_join;
-- ===================================================================
-- test partitionwise aggregates
-- ===================================================================
+CREATE OR REPLACE FUNCTION f_server_version_lag(version_offset int4) RETURNS text AS $$
+ SELECT (setting::int4 + version_offset)::text FROM pg_settings WHERE name = 'server_version_num';
+$$ LANGUAGE sql;
-CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a);
+CREATE OR REPLACE FUNCTION f_alter_server_version(server_name text, operation text, version_offset int4) RETURNS void AS $$
+ DECLARE
+ version_num text;
+ BEGIN
+ EXECUTE 'SELECT f_server_version_lag($1) '
+ INTO version_num
+ USING version_offset;
+ EXECUTE format('ALTER SERVER %I OPTIONS (%I server_version %L)',
+ server_name, operation, version_num);
+ RETURN;
+ END;
+$$ LANGUAGE plpgsql;
+
+SELECT f_alter_server_version('loopback', 'add', 0);
+
+CREATE TABLE pagg_tab (a int, b int, c text, d int4) PARTITION BY RANGE(a);
CREATE TABLE pagg_tab_p1 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p2 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p3 (LIKE pagg_tab);
-INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
-INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
-INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
+INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
+INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
+INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
-- Create foreign partitions
CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1');
@@ -2987,6 +3005,65 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
EXPLAIN (COSTS OFF)
SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+-- It's unsafe to push down having clause when there are partial aggregates
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+
+-- Partial aggregates are fine to push down without having clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1;
+SELECT b, min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab GROUP BY b ORDER BY 1;
+
+-- Partial aggregates are fine to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab;
+SELECT min(a), max(a), count(*), sum(d), sum(d::int8), avg(d), avg(d::int8) FROM pagg_tab;
+
+-- It's unsafe to push down partial aggregates which contains distinct clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(distinct b) FROM pagg_tab;
+SELECT max(a), count(distinct b) FROM pagg_tab;
+
+-- It's unsafe to push down partial aggregates when
+-- server_version is older than partialagg_minversion, described by pg_aggregate
+SELECT f_alter_server_version('loopback', 'set', -1);
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT avg(d) FROM pagg_tab;
+SELECT avg(d) FROM pagg_tab;
+
+-- It's safe to push down partial aggregate for user defined aggregate
+-- which has valid options.
+CREATE AGGREGATE postgres_fdw_sum_p_int8(int8) (
+ SFUNC = int8_avg_accum,
+ STYPE = internal,
+ FINALFUNC = int8_avg_serialize
+);
+
+CREATE AGGREGATE postgres_fdw_sum(int8) (
+ SFUNC = int8_avg_accum,
+ STYPE = internal,
+ COMBINEFUNC = int8_avg_combine,
+ FINALFUNC = numeric_poly_sum,
+ SERIALFUNC = int8_avg_serialize,
+ DESERIALFUNC = int8_avg_deserialize,
+ PARTIALAGGFUNC = postgres_fdw_sum_p_int8,
+ PARTIALAGG_MINVERSION = 150000
+);
+
+ALTER EXTENSION postgres_fdw ADD FUNCTION postgres_fdw_sum(int8);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT postgres_fdw_sum(d::int8) FROM pagg_tab;
+SELECT postgres_fdw_sum(d::int8) FROM pagg_tab;
+
+ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_sum(int8);
+DROP AGGREGATE postgres_fdw_sum(int8);
+DROP AGGREGATE postgres_fdw_sum_p_int8(int8);
+
+DROP FUNCTION f_server_version_lag(int4);
+DROP FUNCTION f_alter_server_version(text, text, int4);
+ALTER SERVER loopback OPTIONS (DROP server_version);
+
-- ===================================================================
-- access rights and superuser
-- ===================================================================
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index ebc4454743..037eb06f4d 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -63,6 +63,7 @@ AggregateCreate(const char *aggName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
+ List *partialaggfnName,
bool finalfnExtraArgs,
bool mfinalfnExtraArgs,
char finalfnModify,
@@ -72,6 +73,7 @@ AggregateCreate(const char *aggName,
int32 aggTransSpace,
Oid aggmTransType,
int32 aggmTransSpace,
+ int32 partialaggMinversion,
const char *agginitval,
const char *aggminitval,
char proparallel)
@@ -91,6 +93,7 @@ AggregateCreate(const char *aggName,
Oid mtransfn = InvalidOid; /* can be omitted */
Oid minvtransfn = InvalidOid; /* can be omitted */
Oid mfinalfn = InvalidOid; /* can be omitted */
+ Oid partialaggfn = InvalidOid; /* can be omitted */
Oid sortop = InvalidOid; /* can be omitted */
Oid *aggArgTypes = parameterTypes->values;
bool mtransIsStrict = false;
@@ -569,6 +572,33 @@ AggregateCreate(const char *aggName,
format_type_be(finaltype))));
}
+ /*
+ * Validate the partial aggregate function, if present.
+ */
+ if (partialaggfnName)
+ {
+ HeapTuple aggtup;
+ partialaggfn = LookupFuncName(partialaggfnName, numArgs, aggArgTypes, false);
+
+ /* Check aggregate creator has permission to call the function */
+ aclresult = object_aclcheck(ProcedureRelationId, partialaggfn, GetUserId(), ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(partialaggfn));
+
+ rettype = get_func_rettype(partialaggfn);
+
+ if (((aggTransType == INTERNALOID) && (rettype != BYTEAOID))
+ || ((aggTransType != INTERNALOID) && (rettype != aggTransType)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("return type of partialaggfunc of %s is not stype",
+ NameListToString(partialaggfnName))));
+ aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(partialaggfn));
+ if (!HeapTupleIsValid(aggtup))
+ elog(ERROR, "cache lookup failed for partialaggfunc %u", partialaggfn);
+ ReleaseSysCache(aggtup);
+ }
+
/* handle sortop, if supplied */
if (aggsortopName)
{
@@ -684,6 +714,8 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval);
else
nulls[Anum_pg_aggregate_aggminitval - 1] = true;
+ values[Anum_pg_aggregate_partialaggfn - 1] = ObjectIdGetDatum(partialaggfn);
+ values[Anum_pg_aggregate_partialagg_minversion - 1] = Int32GetDatum(partialaggMinversion);
if (replace)
oldtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(procOid));
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index fda9d1aa77..cb49a79cfa 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -70,6 +70,7 @@ DefineAggregate(ParseState *pstate,
List *combinefuncName = NIL;
List *serialfuncName = NIL;
List *deserialfuncName = NIL;
+ List *partialaggfuncName = NIL;
List *mtransfuncName = NIL;
List *minvtransfuncName = NIL;
List *mfinalfuncName = NIL;
@@ -83,6 +84,7 @@ DefineAggregate(ParseState *pstate,
TypeName *mtransType = NULL;
int32 transSpace = 0;
int32 mtransSpace = 0;
+ int32 partialaggMinversion = 0;
char *initval = NULL;
char *minitval = NULL;
char *parallel = NULL;
@@ -143,6 +145,8 @@ DefineAggregate(ParseState *pstate,
serialfuncName = defGetQualifiedName(defel);
else if (strcmp(defel->defname, "deserialfunc") == 0)
deserialfuncName = defGetQualifiedName(defel);
+ else if (strcmp(defel->defname, "partialaggfunc") == 0)
+ partialaggfuncName = defGetQualifiedName(defel);
else if (strcmp(defel->defname, "msfunc") == 0)
mtransfuncName = defGetQualifiedName(defel);
else if (strcmp(defel->defname, "minvfunc") == 0)
@@ -182,6 +186,8 @@ DefineAggregate(ParseState *pstate,
mtransType = defGetTypeName(defel);
else if (strcmp(defel->defname, "msspace") == 0)
mtransSpace = defGetInt32(defel);
+ else if (strcmp(defel->defname, "partialagg_minversion") == 0)
+ partialaggMinversion = defGetInt32(defel);
else if (strcmp(defel->defname, "initcond") == 0)
initval = defGetString(defel);
else if (strcmp(defel->defname, "initcond1") == 0)
@@ -461,6 +467,7 @@ DefineAggregate(ParseState *pstate,
mtransfuncName, /* fwd trans function name */
minvtransfuncName, /* inv trans function name */
mfinalfuncName, /* final function name */
+ partialaggfuncName,
finalfuncExtraArgs,
mfinalfuncExtraArgs,
finalfuncModify,
@@ -470,6 +477,7 @@ DefineAggregate(ParseState *pstate,
transSpace, /* transition space */
mtransTypeId, /* transition data type */
mtransSpace, /* transition space */
+ partialaggMinversion,
initval, /* initial condition */
minitval, /* initial condition */
proparallel); /* parallel safe? */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7a504dfe25..80165ada03 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -13693,6 +13693,8 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
const char *aggmtransfn;
const char *aggminvtransfn;
const char *aggmfinalfn;
+ const char *partialaggfn;
+ const char *partialagg_minversion;
bool aggfinalextra;
bool aggmfinalextra;
char aggfinalmodify;
@@ -13775,11 +13777,19 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
if (fout->remoteVersion >= 110000)
appendPQExpBufferStr(query,
"aggfinalmodify,\n"
- "aggmfinalmodify\n");
+ "aggmfinalmodify,\n");
else
appendPQExpBufferStr(query,
"'0' AS aggfinalmodify,\n"
- "'0' AS aggmfinalmodify\n");
+ "'0' AS aggmfinalmodify,\n");
+ if (fout->remoteVersion >= 160000)
+ appendPQExpBufferStr(query,
+ "partialaggfn,\n"
+ "partialagg_minversion\n");
+ else
+ appendPQExpBufferStr(query,
+ "'-' AS partialaggfn,\n"
+ "0 AS partialagg_minversion\n");
appendPQExpBufferStr(query,
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
@@ -13805,6 +13815,8 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
aggcombinefn = PQgetvalue(res, 0, PQfnumber(res, "aggcombinefn"));
aggserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggserialfn"));
aggdeserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggdeserialfn"));
+ partialaggfn = PQgetvalue(res, 0, PQfnumber(res, "partialaggfn"));
+ partialagg_minversion = PQgetvalue(res, 0, PQfnumber(res, "partialagg_minversion"));
aggmtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggmtransfn"));
aggminvtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggminvtransfn"));
aggmfinalfn = PQgetvalue(res, 0, PQfnumber(res, "aggmfinalfn"));
@@ -13894,6 +13906,15 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
if (strcmp(aggdeserialfn, "-") != 0)
appendPQExpBuffer(details, ",\n DESERIALFUNC = %s", aggdeserialfn);
+ if (strcmp(partialaggfn, "-") != 0)
+ appendPQExpBuffer(details, ",\n PARTIALAGGFUNC = %s", partialaggfn);
+
+ if (strcmp(partialagg_minversion, "0") != 0)
+ {
+ appendPQExpBuffer(details, ",\n PARTIALAGG_MINVERSION = %s",
+ partialagg_minversion);
+ }
+
if (strcmp(aggmtransfn, "-") != 0)
{
appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s",
diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat
index 283f494bf5..689ed3546a 100644
--- a/src/include/catalog/pg_aggregate.dat
+++ b/src/include/catalog/pg_aggregate.dat
@@ -13,22 +13,44 @@
[
# avg
+{ aggfnoid => 'avg_p_int8', aggtransfn => 'int8_avg_accum',
+ aggfinalfn => 'int8_avg_serialize', aggcombinefn => 'int8_avg_combine',
+ aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
+ aggtranstype => 'internal',
+ aggtransspace => '48' },
{ aggfnoid => 'avg(int8)', aggtransfn => 'int8_avg_accum',
aggfinalfn => 'numeric_poly_avg', aggcombinefn => 'int8_avg_combine',
aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv',
aggmfinalfn => 'numeric_poly_avg', aggtranstype => 'internal',
- aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' },
+ aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48',
+ partialaggfn => 'avg_p_int8', partialagg_minversion => '160000' },
+{ aggfnoid => 'avg_p_int4', aggtransfn => 'int4_avg_accum',
+ aggcombinefn => 'int4_avg_combine',
+ aggtranstype => '_int8',
+ agginitval => '{0,0}' },
{ aggfnoid => 'avg(int4)', aggtransfn => 'int4_avg_accum',
aggfinalfn => 'int8_avg', aggcombinefn => 'int4_avg_combine',
aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv',
aggmfinalfn => 'int8_avg', aggtranstype => '_int8', aggmtranstype => '_int8',
- agginitval => '{0,0}', aggminitval => '{0,0}' },
+ agginitval => '{0,0}', aggminitval => '{0,0}',
+ partialaggfn => 'avg_p_int4', partialagg_minversion => '160000' },
+{ aggfnoid => 'avg_p_int2', aggtransfn => 'int2_avg_accum',
+ aggcombinefn => 'int4_avg_combine',
+ aggtranstype => '_int8',
+ agginitval => '{0,0}' },
{ aggfnoid => 'avg(int2)', aggtransfn => 'int2_avg_accum',
aggfinalfn => 'int8_avg', aggcombinefn => 'int4_avg_combine',
aggmtransfn => 'int2_avg_accum', aggminvtransfn => 'int2_avg_accum_inv',
aggmfinalfn => 'int8_avg', aggtranstype => '_int8', aggmtranstype => '_int8',
- agginitval => '{0,0}', aggminitval => '{0,0}' },
+ agginitval => '{0,0}', aggminitval => '{0,0}',
+ partialaggfn => 'avg_p_int2', partialagg_minversion => '160000' },
+{ aggfnoid => 'avg_p_numeric', aggtransfn => 'numeric_avg_accum',
+ aggfinalfn => 'numeric_avg_serialize', aggcombinefn => 'numeric_avg_combine',
+ aggserialfn => 'numeric_avg_serialize',
+ aggdeserialfn => 'numeric_avg_deserialize',
+ aggtranstype => 'internal', aggtransspace => '128',
+ partialaggfn => 'avg_p_numeric', partialagg_minversion => '160000' },
{ aggfnoid => 'avg(numeric)', aggtransfn => 'numeric_avg_accum',
aggfinalfn => 'numeric_avg', aggcombinefn => 'numeric_avg_combine',
aggserialfn => 'numeric_avg_serialize',
@@ -36,27 +58,45 @@
aggmtransfn => 'numeric_avg_accum', aggminvtransfn => 'numeric_accum_inv',
aggmfinalfn => 'numeric_avg', aggtranstype => 'internal',
aggtransspace => '128', aggmtranstype => 'internal',
- aggmtransspace => '128' },
+ aggmtransspace => '128',
+ partialaggfn => 'avg_p_numeric', partialagg_minversion => '160000' },
+{ aggfnoid => 'avg_p_float4', aggtransfn => 'float4_accum',
+ aggcombinefn => 'float8_combine',
+ aggtranstype => '_float8', agginitval => '{0,0,0}' },
{ aggfnoid => 'avg(float4)', aggtransfn => 'float4_accum',
aggfinalfn => 'float8_avg', aggcombinefn => 'float8_combine',
+ aggtranstype => '_float8', agginitval => '{0,0,0}',
+ partialaggfn => 'avg_p_float4', partialagg_minversion => '160000' },
+{ aggfnoid => 'avg_p_float8', aggtransfn => 'float8_accum',
+ aggcombinefn => 'float8_combine',
aggtranstype => '_float8', agginitval => '{0,0,0}' },
{ aggfnoid => 'avg(float8)', aggtransfn => 'float8_accum',
aggfinalfn => 'float8_avg', aggcombinefn => 'float8_combine',
- aggtranstype => '_float8', agginitval => '{0,0,0}' },
+ aggtranstype => '_float8', agginitval => '{0,0,0}',
+ partialaggfn => 'avg_p_float8', partialagg_minversion => '160000' },
+{ aggfnoid => 'avg_p_interval', aggtransfn => 'interval_accum',
+ aggcombinefn => 'interval_combine',
+ aggtranstype => '_interval', agginitval => '{0 second,0 second}' },
{ aggfnoid => 'avg(interval)', aggtransfn => 'interval_accum',
aggfinalfn => 'interval_avg', aggcombinefn => 'interval_combine',
aggmtransfn => 'interval_accum', aggminvtransfn => 'interval_accum_inv',
aggmfinalfn => 'interval_avg', aggtranstype => '_interval',
aggmtranstype => '_interval', agginitval => '{0 second,0 second}',
- aggminitval => '{0 second,0 second}' },
+ aggminitval => '{0 second,0 second}',
+ partialaggfn => 'avg_p_interval', partialagg_minversion => '160000' },
# sum
+{ aggfnoid => 'sum_p_int8', aggtransfn => 'int8_avg_accum',
+ aggfinalfn => 'int8_avg_serialize', aggcombinefn => 'int8_avg_combine',
+ aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
+ aggtranstype => 'internal', aggtransspace => '48' },
{ aggfnoid => 'sum(int8)', aggtransfn => 'int8_avg_accum',
aggfinalfn => 'numeric_poly_sum', aggcombinefn => 'int8_avg_combine',
aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv',
aggmfinalfn => 'numeric_poly_sum', aggtranstype => 'internal',
- aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' },
+ aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48',
+ partialaggfn => 'sum_p_int8', partialagg_minversion => '160000' },
{ aggfnoid => 'sum(int4)', aggtransfn => 'int4_sum', aggcombinefn => 'int8pl',
aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv',
aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
@@ -76,6 +116,11 @@
aggcombinefn => 'interval_pl', aggmtransfn => 'interval_pl',
aggminvtransfn => 'interval_mi', aggtranstype => 'interval',
aggmtranstype => 'interval' },
+{ aggfnoid => 'sum_p_numeric', aggtransfn => 'numeric_avg_accum',
+ aggfinalfn => 'numeric_avg_serialize', aggcombinefn => 'numeric_avg_combine',
+ aggserialfn => 'numeric_avg_serialize',
+ aggdeserialfn => 'numeric_avg_deserialize',
+ aggtranstype => 'internal', aggtransspace => '128' },
{ aggfnoid => 'sum(numeric)', aggtransfn => 'numeric_avg_accum',
aggfinalfn => 'numeric_sum', aggcombinefn => 'numeric_avg_combine',
aggserialfn => 'numeric_avg_serialize',
@@ -83,7 +128,8 @@
aggmtransfn => 'numeric_avg_accum', aggminvtransfn => 'numeric_accum_inv',
aggmfinalfn => 'numeric_sum', aggtranstype => 'internal',
aggtransspace => '128', aggmtranstype => 'internal',
- aggmtransspace => '128' },
+ aggmtransspace => '128',
+ partialaggfn => 'sum_p_numeric', partialagg_minversion => '160000' },
# max
{ aggfnoid => 'max(int8)', aggtransfn => 'int8larger',
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 3112881193..1cefa1bfba 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -55,6 +55,12 @@ CATALOG(pg_aggregate,2600,AggregateRelationId)
/* function to convert bytea to transtype (0 if none) */
regproc aggdeserialfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
+ /* special aggregate function for partial aggregation (0 if none) */
+ regproc partialaggfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
+
+ /* minimum PostgreSQL's version which has compatible partialaggfn */
+ int32 partialagg_minversion BKI_DEFAULT(0);
+
/* forward function for moving-aggregate mode (0 if none) */
regproc aggmtransfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
@@ -141,6 +147,9 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_aggregate_fnoid_index, 2650, AggregateFnoidIndexId,
#define AGGMODIFY_SHAREABLE 's'
#define AGGMODIFY_READ_WRITE 'w'
+/* Symbolic value for default partialagg_minversion */
+#define PARTIALAGG_MINVERSION_DEFAULT 0
+
#endif /* EXPOSE_TO_CLIENT_CODE */
@@ -164,6 +173,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
+ List *aggpartialfnName,
bool finalfnExtraArgs,
bool mfinalfnExtraArgs,
char finalfnModify,
@@ -173,6 +183,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
int32 aggTransSpace,
Oid aggmTransType,
int32 aggmTransSpace,
+ int32 partialaggMinversion,
const char *agginitval,
const char *aggminitval,
char proparallel);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9f2642201..2f27b54133 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6594,35 +6594,67 @@
descr => 'the average (arithmetic mean) as numeric of all bigint values',
proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric',
proargtypes => 'int8', prosrc => 'aggregate_dummy' },
+{ oid => '13500',
+ descr => 'the partial average (arithmetic mean) as numeric of all bigint values',
+ proname => 'avg_p_int8', prokind => 'a', proisstrict => 'f', prorettype => 'bytea',
+ proargtypes => 'int8', prosrc => 'aggregate_dummy' },
{ oid => '2101',
descr => 'the average (arithmetic mean) as numeric of all integer values',
proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric',
proargtypes => 'int4', prosrc => 'aggregate_dummy' },
+{ oid => '13501',
+ descr => 'the partial average (arithmetic mean) as numeric of all integer values',
+ proname => 'avg_p_int4', prokind => 'a', proisstrict => 'f', prorettype => '_int8',
+ proargtypes => 'int4', prosrc => 'aggregate_dummy' },
{ oid => '2102',
descr => 'the average (arithmetic mean) as numeric of all smallint values',
proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric',
proargtypes => 'int2', prosrc => 'aggregate_dummy' },
+{ oid => '13502',
+ descr => 'the partial average (arithmetic mean) as numeric of all smallint values',
+ proname => 'avg_p_int2', prokind => 'a', proisstrict => 'f', prorettype => '_int8',
+ proargtypes => 'int2', prosrc => 'aggregate_dummy' },
{ oid => '2103',
descr => 'the average (arithmetic mean) as numeric of all numeric values',
proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'numeric',
proargtypes => 'numeric', prosrc => 'aggregate_dummy' },
+{ oid => '13503',
+ descr => 'the partial average (arithmetic mean) as numeric of all numeric values',
+ proname => 'avg_p_numeric', prokind => 'a', proisstrict => 'f', prorettype => 'bytea',
+ proargtypes => 'numeric', prosrc => 'aggregate_dummy' },
{ oid => '2104',
descr => 'the average (arithmetic mean) as float8 of all float4 values',
proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'float8',
proargtypes => 'float4', prosrc => 'aggregate_dummy' },
+{ oid => '13504',
+ descr => 'the partial average (arithmetic mean) as float8 of all float4 values',
+ proname => 'avg_p_float4', prokind => 'a', proisstrict => 'f', prorettype => '_float8',
+ proargtypes => 'float4', prosrc => 'aggregate_dummy' },
{ oid => '2105',
descr => 'the average (arithmetic mean) as float8 of all float8 values',
proname => 'avg', prokind => 'a', proisstrict => 'f', prorettype => 'float8',
proargtypes => 'float8', prosrc => 'aggregate_dummy' },
+{ oid => '13505',
+ descr => 'the partial average (arithmetic mean) as float8 of all float8 values',
+ proname => 'avg_p_float8', prokind => 'a', proisstrict => 'f', prorettype => '_float8',
+ proargtypes => 'float8', prosrc => 'aggregate_dummy' },
{ oid => '2106',
descr => 'the average (arithmetic mean) as interval of all interval values',
proname => 'avg', prokind => 'a', proisstrict => 'f',
prorettype => 'interval', proargtypes => 'interval',
prosrc => 'aggregate_dummy' },
+{ oid => '13506',
+ descr => 'the partial average (arithmetic mean) as interval of all interval values',
+ proname => 'avg_p_interval', prokind => 'a', proisstrict => 'f',
+ prorettype => '_interval', proargtypes => 'interval',
+ prosrc => 'aggregate_dummy' },
{ oid => '2107', descr => 'sum as numeric across all bigint input values',
proname => 'sum', prokind => 'a', proisstrict => 'f', prorettype => 'numeric',
proargtypes => 'int8', prosrc => 'aggregate_dummy' },
+{ oid => '13507', descr => 'partial sum as numeric across all bigint input values',
+ proname => 'sum_p_int8', prokind => 'a', proisstrict => 'f', prorettype => 'bytea',
+ proargtypes => 'int8', prosrc => 'aggregate_dummy' },
{ oid => '2108', descr => 'sum as bigint across all integer input values',
proname => 'sum', prokind => 'a', proisstrict => 'f', prorettype => 'int8',
proargtypes => 'int4', prosrc => 'aggregate_dummy' },
@@ -6645,6 +6677,9 @@
{ oid => '2114', descr => 'sum as numeric across all numeric input values',
proname => 'sum', prokind => 'a', proisstrict => 'f', prorettype => 'numeric',
proargtypes => 'numeric', prosrc => 'aggregate_dummy' },
+{ oid => '13508', descr => 'partial sum as numeric across all numeric input values',
+ proname => 'sum_p_numeric', prokind => 'a', proisstrict => 'f', prorettype => 'bytea',
+ proargtypes => 'numeric', prosrc => 'aggregate_dummy' },
{ oid => '2115', descr => 'maximum value of all bigint input values',
proname => 'max', prokind => 'a', proisstrict => 'f', prorettype => 'int8',
diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out
index dcf6909423..3197c8abba 100644
--- a/src/test/regress/expected/create_aggregate.out
+++ b/src/test/regress/expected/create_aggregate.out
@@ -322,3 +322,27 @@ WARNING: aggregate attribute "Finalfunc_extra" not recognized
WARNING: aggregate attribute "Finalfunc_modify" not recognized
WARNING: aggregate attribute "Parallel" not recognized
ERROR: aggregate stype must be specified
+-- invalid: partialaggfunc which doesn't exist
+CREATE AGGREGATE haspartialagg_agg(int8) (
+ sfunc = int8_avg_accum,
+ stype = internal,
+ combinefunc = int8_avg_combine,
+ finalfunc = numeric_poly_sum,
+ serialfunc = int8_avg_serialize,
+ deserialfunc = int8_avg_deserialize,
+ partialaggfunc = partialagg_foo,
+ partialagg_minversion = 160000
+);
+ERROR: function partialagg_foo(bigint) does not exist
+-- invalid: partialagg_minversion is not integer
+CREATE AGGREGATE haspartialagg_agg(int8) (
+ sfunc = int8_avg_accum,
+ stype = internal,
+ combinefunc = int8_avg_combine,
+ finalfunc = numeric_poly_sum,
+ serialfunc = int8_avg_serialize,
+ deserialfunc = int8_avg_deserialize,
+ partialaggfunc = partialagg_foo,
+ partialagg_minversion = aaa
+);
+ERROR: partialagg_minversion requires an integer value
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be..49f2c9bc35 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -145,6 +145,7 @@ NOTICE: checking pg_aggregate {aggfinalfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggcombinefn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggserialfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggdeserialfn} => pg_proc {oid}
+NOTICE: checking pg_aggregate {partialaggfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggmtransfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggminvtransfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggmfinalfn} => pg_proc {oid}
diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql
index d4b4036fd7..499c120ca5 100644
--- a/src/test/regress/sql/create_aggregate.sql
+++ b/src/test/regress/sql/create_aggregate.sql
@@ -328,3 +328,27 @@ CREATE AGGREGATE case_agg(float8)
"Finalfunc_modify" = read_write,
"Parallel" = safe
);
+
+-- invalid: partialaggfunc which doesn't exist
+CREATE AGGREGATE haspartialagg_agg(int8) (
+ sfunc = int8_avg_accum,
+ stype = internal,
+ combinefunc = int8_avg_combine,
+ finalfunc = numeric_poly_sum,
+ serialfunc = int8_avg_serialize,
+ deserialfunc = int8_avg_deserialize,
+ partialaggfunc = partialagg_foo,
+ partialagg_minversion = 160000
+);
+
+-- invalid: partialagg_minversion is not integer
+CREATE AGGREGATE haspartialagg_agg(int8) (
+ sfunc = int8_avg_accum,
+ stype = internal,
+ combinefunc = int8_avg_combine,
+ finalfunc = numeric_poly_sum,
+ serialfunc = int8_avg_serialize,
+ deserialfunc = int8_avg_deserialize,
+ partialaggfunc = partialagg_foo,
+ partialagg_minversion = aaa
+);
\ No newline at end of file