Tomas Vondra писал 2022-03-22 15:28:
On 3/22/22 01:49, Andres Freund wrote:
On 2022-01-17 15:27:53 +0300, Alexander Pyhalov wrote:
Alexander Pyhalov писал 2022-01-17 15:26:
Updated patch.

Sorry, missed attachment.

Needs another update: http://cfbot.cputube.org/patch_37_3369.log

Marked as waiting on author.


TBH I'm still not convinced this is the right approach. I've voiced this
opinion before, but to reiterate the main arguments:

1) It's not clear to me how could this get extended to aggregates with
more complex aggregate states, to support e.g. avg() and similar fairly
common aggregates.

Hi.
Yes, I'm also not sure how to proceed with aggregates with complex state. Likely it needs separate function to export their state, but then we should somehow ensure that this function exists and our 'importer' can handle its result. Note that for now we have no mechanics in postgres_fdw to find out remote server version
on planning stage.

2) I'm not sure relying on aggpartialpushdownsafe without any version
checks etc. is sufficient. I mean, how would we know the remote node has
the same idea of representing the aggregate state. I wonder how this
aligns with assumptions we do e.g. for functions etc.

It seems to be not a problem for me, as for now we don't care about remote node internal aggregate state representation.
We currently get just aggregate result from remote node. For aggregates
with 'internal' stype we call converter locally, and it converts external result from
aggregate return type to local node internal representation.


Aside from that, there's a couple review comments:

1) should not remove the comment in foreign_expr_walker

Fixed.


2) comment in deparseAggref is obsolete/inaccurate

Fixed.


3) comment for partial_agg_ok should probably explain when we consider
aggregate OK to be pushed down

Expanded comment.

4) I'm not sure why get_rcvd_attinmeta comment talks about "return type
bytea" and "real input type".

Expanded comment. Tupdesc can be retrieved from node->ss.ss_ScanTupleSlot, and so we expect to see bytea (as should be produced by partial aggregation).
But when we scan data, we get aggregate
output type (which matches converter input type), so attinmeta should
be fixed.
If we deal with aggregate which doesn't have converter, partial_agg_ok()
ensures that agg->aggfnoid return type matches agg->aggtranstype.


5) Talking about "partial" aggregates is a bit confusing, because that
suggests this is related to actual "partial aggregates". But it's not.

How should we call them? It's about pushing "Partial count()" or "Partial sum()" to the remote server, why it's not related to partial aggregates? Do you mean that it's not about parallel aggregate processing?

6) Can add_foreign_grouping_paths do without the new 'partial'
parameter? Clearly, it can be deduced from extra->patype, no?

Fixed this.


7) There's no docs for PARTIALCONVERTERFUNC / PARTIAL_PUSHDOWN_SAFE in
CREATE AGGREGATE sgml docs.

Added documentation. I'd appreciate advice on how it should be extended.


8) I don't think "serialize" in the converter functions is the right
term, considering those functions are not "serializing" anything. If
anything, it's the remote node that is serializing the agg state and the
local not is deserializing it. Or maybe I just misunderstand where are
the converter functions executed?

Converter function transforms aggregate result to serialized internal representation,
which is expected from partial aggregate. I mean, it converts aggregate
result type to internal representation and then efficiently executes
serialization code (i.e. converter(x) == serialize(to_internal(x))).

--
Best regards,
Alexander Pyhalov,
Postgres Professional
From 7ad4eacf017a4fd3793f0949ef43ccc8292bf3f6 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <a.pyha...@postgrespro.ru>
Date: Thu, 14 Oct 2021 17:30:34 +0300
Subject: [PATCH] Partial aggregates push down

---
 contrib/postgres_fdw/deparse.c                |  63 +++++-
 .../postgres_fdw/expected/postgres_fdw.out    | 185 ++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.c           | 187 +++++++++++++++++-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  27 ++-
 doc/src/sgml/ref/create_aggregate.sgml        |  27 +++
 src/backend/catalog/pg_aggregate.c            |  28 ++-
 src/backend/commands/aggregatecmds.c          |  23 ++-
 src/backend/utils/adt/numeric.c               |  96 +++++++++
 src/bin/pg_dump/pg_dump.c                     |  20 +-
 src/include/catalog/pg_aggregate.dat          | 110 ++++++-----
 src/include/catalog/pg_aggregate.h            |  10 +-
 src/include/catalog/pg_proc.dat               |   6 +
 src/test/regress/expected/oidjoins.out        |   1 +
 13 files changed, 702 insertions(+), 81 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index bf12eac0288..272e2391d7f 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -197,6 +197,7 @@ static bool is_subquery_var(Var *node, RelOptInfo *foreignrel,
 static void get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
 										  int *relno, int *colno);
 
+static bool partial_agg_ok(Aggref *agg);
 
 /*
  * Examine each qual clause in input_conds, and classify them into two groups,
@@ -833,7 +834,10 @@ foreign_expr_walker(Node *node,
 					return false;
 
 				/* Only non-split aggregates are pushable. */
-				if (agg->aggsplit != AGGSPLIT_SIMPLE)
+				if ((agg->aggsplit != AGGSPLIT_SIMPLE) && (agg->aggsplit != AGGSPLIT_INITIAL_SERIAL))
+					return false;
+
+				if (agg->aggsplit == AGGSPLIT_INITIAL_SERIAL && !partial_agg_ok(agg))
 					return false;
 
 				/* As usual, it must be shippable. */
@@ -3348,8 +3352,8 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
 	StringInfo	buf = context->buf;
 	bool		use_variadic;
 
-	/* Only basic, non-split aggregation accepted. */
-	Assert(node->aggsplit == AGGSPLIT_SIMPLE);
+	/* Only non-split aggregation accepted. */
+	Assert(node->aggsplit == AGGSPLIT_SIMPLE || node->aggsplit == AGGSPLIT_INITIAL_SERIAL);
 
 	/* Check if need to print VARIADIC (cf. ruleutils.c) */
 	use_variadic = node->aggvariadic;
@@ -3819,3 +3823,56 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
 	/* Shouldn't get here */
 	elog(ERROR, "unexpected expression in subquery output");
 }
+
+/*
+ * Check that partial aggregate agg is fine to push down
+ *
+ * Check that it's AGGKIND_NORMAL aggregate without
+ * distinct or order by clauses, which has aggpartialpushdownsafe
+ * property. If aggtranstype is internal, aggpartialconverterfn
+ * should be defined. Otherwise aggfn return type should match aggtranstype.
+ */
+static bool
+partial_agg_ok(Aggref *agg)
+{
+	HeapTuple	aggtup;
+	Form_pg_aggregate aggform;
+
+	Assert(agg->aggsplit == AGGSPLIT_INITIAL_SERIAL);
+
+	/* We don't support complex partial aggregates */
+	if (agg->aggdistinct || agg->aggvariadic || agg->aggkind != AGGKIND_NORMAL || agg->aggorder != NIL)
+		return false;
+
+	aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(agg->aggfnoid));
+	if (!HeapTupleIsValid(aggtup))
+		elog(ERROR, "cache lookup failed for function %u", agg->aggfnoid);
+	aggform = (Form_pg_aggregate) GETSTRUCT(aggtup);
+
+	/* Only aggregates, marked as pushdown safe, are allowed */
+	if (!aggform->aggpartialpushdownsafe)
+	{
+		ReleaseSysCache(aggtup);
+		return false;
+	}
+
+	/*
+	 * If an aggregate requires serialization/deserialization, partial
+	 * converter should be defined
+	 */
+	if (agg->aggtranstype == INTERNALOID && aggform->aggpartialconverterfn == InvalidOid)
+	{
+		ReleaseSysCache(aggtup);
+		return false;
+	}
+
+	/* In this case we currently don't use converter */
+	if (agg->aggtranstype != INTERNALOID && get_func_rettype(agg->aggfnoid) != agg->aggtranstype)
+	{
+		ReleaseSysCache(aggtup);
+		return false;
+	}
+
+	ReleaseSysCache(aggtup);
+	return true;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index f210f911880..e020e9b8a41 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9336,13 +9336,13 @@ RESET enable_partitionwise_join;
 -- ===================================================================
 -- test partitionwise aggregates
 -- ===================================================================
-CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a);
+CREATE TABLE pagg_tab (a int, b int, c text, d numeric) PARTITION BY RANGE(a);
 CREATE TABLE pagg_tab_p1 (LIKE pagg_tab);
 CREATE TABLE pagg_tab_p2 (LIKE pagg_tab);
 CREATE TABLE pagg_tab_p3 (LIKE pagg_tab);
-INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
-INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
-INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
+INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
+INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
+INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
 -- Create foreign partitions
 CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1');
 CREATE FOREIGN TABLE fpagg_tab_p2 PARTITION OF pagg_tab FOR VALUES FROM (10) TO (20) SERVER loopback OPTIONS (table_name 'pagg_tab_p2');
@@ -9401,8 +9401,8 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
 -- Should have all the columns in the target list for the given relation
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
-                               QUERY PLAN                               
-------------------------------------------------------------------------
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
  Sort
    Output: t1.a, (count(((t1.*)::pagg_tab)))
    Sort Key: t1.a
@@ -9413,21 +9413,21 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
                Filter: (avg(t1.b) < '22'::numeric)
                ->  Foreign Scan on public.fpagg_tab_p1 t1
                      Output: t1.a, t1.*, t1.b
-                     Remote SQL: SELECT a, b, c FROM public.pagg_tab_p1
+                     Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p1
          ->  HashAggregate
                Output: t1_1.a, count(((t1_1.*)::pagg_tab))
                Group Key: t1_1.a
                Filter: (avg(t1_1.b) < '22'::numeric)
                ->  Foreign Scan on public.fpagg_tab_p2 t1_1
                      Output: t1_1.a, t1_1.*, t1_1.b
-                     Remote SQL: SELECT a, b, c FROM public.pagg_tab_p2
+                     Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p2
          ->  HashAggregate
                Output: t1_2.a, count(((t1_2.*)::pagg_tab))
                Group Key: t1_2.a
                Filter: (avg(t1_2.b) < '22'::numeric)
                ->  Foreign Scan on public.fpagg_tab_p3 t1_2
                      Output: t1_2.a, t1_2.*, t1_2.b
-                     Remote SQL: SELECT a, b, c FROM public.pagg_tab_p3
+                     Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p3
 (25 rows)
 
 SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
@@ -9463,6 +9463,173 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700
                      ->  Foreign Scan on fpagg_tab_p3 pagg_tab_2
 (15 rows)
 
+-- It's unsafe to push down having clause when there are partial aggregates
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+                                                    QUERY PLAN                                                    
+------------------------------------------------------------------------------------------------------------------
+ Sort
+   Output: pagg_tab.b, (max(pagg_tab.a)), (count(*))
+   Sort Key: pagg_tab.b
+   ->  Finalize HashAggregate
+         Output: pagg_tab.b, max(pagg_tab.a), count(*)
+         Group Key: pagg_tab.b
+         Filter: (sum(pagg_tab.a) < 700)
+         ->  Append
+               ->  Partial HashAggregate
+                     Output: pagg_tab.b, PARTIAL max(pagg_tab.a), PARTIAL count(*), PARTIAL sum(pagg_tab.a)
+                     Group Key: pagg_tab.b
+                     ->  Foreign Scan on public.fpagg_tab_p1 pagg_tab
+                           Output: pagg_tab.b, pagg_tab.a
+                           Remote SQL: SELECT a, b FROM public.pagg_tab_p1
+               ->  Partial HashAggregate
+                     Output: pagg_tab_1.b, PARTIAL max(pagg_tab_1.a), PARTIAL count(*), PARTIAL sum(pagg_tab_1.a)
+                     Group Key: pagg_tab_1.b
+                     ->  Foreign Scan on public.fpagg_tab_p2 pagg_tab_1
+                           Output: pagg_tab_1.b, pagg_tab_1.a
+                           Remote SQL: SELECT a, b FROM public.pagg_tab_p2
+               ->  Partial HashAggregate
+                     Output: pagg_tab_2.b, PARTIAL max(pagg_tab_2.a), PARTIAL count(*), PARTIAL sum(pagg_tab_2.a)
+                     Group Key: pagg_tab_2.b
+                     ->  Foreign Scan on public.fpagg_tab_p3 pagg_tab_2
+                           Output: pagg_tab_2.b, pagg_tab_2.a
+                           Remote SQL: SELECT a, b FROM public.pagg_tab_p3
+(26 rows)
+
+-- Partial aggregates are fine to push down without having clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+                                          QUERY PLAN                                           
+-----------------------------------------------------------------------------------------------
+ Sort
+   Output: pagg_tab.b, (max(pagg_tab.a)), (count(*))
+   Sort Key: pagg_tab.b
+   ->  Finalize HashAggregate
+         Output: pagg_tab.b, max(pagg_tab.a), count(*)
+         Group Key: pagg_tab.b
+         ->  Append
+               ->  Foreign Scan
+                     Output: pagg_tab.b, (PARTIAL max(pagg_tab.a)), (PARTIAL count(*))
+                     Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+                     Remote SQL: SELECT b, max(a), count(*) FROM public.pagg_tab_p1 GROUP BY 1
+               ->  Foreign Scan
+                     Output: pagg_tab_1.b, (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*))
+                     Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+                     Remote SQL: SELECT b, max(a), count(*) FROM public.pagg_tab_p2 GROUP BY 1
+               ->  Foreign Scan
+                     Output: pagg_tab_2.b, (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*))
+                     Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+                     Remote SQL: SELECT b, max(a), count(*) FROM public.pagg_tab_p3 GROUP BY 1
+(19 rows)
+
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+ b  | max | count 
+----+-----+-------
+  0 |  20 |    60
+  1 |  21 |    60
+  2 |  22 |    60
+  3 |  23 |    60
+  4 |  24 |    60
+  5 |  25 |    60
+  6 |  26 |    60
+  7 |  27 |    60
+  8 |  28 |    60
+  9 |  29 |    60
+ 10 |  20 |    60
+ 11 |  21 |    60
+ 12 |  22 |    60
+ 13 |  23 |    60
+ 14 |  24 |    60
+ 15 |  25 |    60
+ 16 |  26 |    60
+ 17 |  27 |    60
+ 18 |  28 |    60
+ 19 |  29 |    60
+ 20 |  20 |    60
+ 21 |  21 |    60
+ 22 |  22 |    60
+ 23 |  23 |    60
+ 24 |  24 |    60
+ 25 |  25 |    60
+ 26 |  26 |    60
+ 27 |  27 |    60
+ 28 |  28 |    60
+ 29 |  29 |    60
+ 30 |  20 |    60
+ 31 |  21 |    60
+ 32 |  22 |    60
+ 33 |  23 |    60
+ 34 |  24 |    60
+ 35 |  25 |    60
+ 36 |  26 |    60
+ 37 |  27 |    60
+ 38 |  28 |    60
+ 39 |  29 |    60
+ 40 |  20 |    60
+ 41 |  21 |    60
+ 42 |  22 |    60
+ 43 |  23 |    60
+ 44 |  24 |    60
+ 45 |  25 |    60
+ 46 |  26 |    60
+ 47 |  27 |    60
+ 48 |  28 |    60
+ 49 |  29 |    60
+(50 rows)
+
+-- Partial aggregates are fine to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+                                             QUERY PLAN                                             
+----------------------------------------------------------------------------------------------------
+ Finalize Aggregate
+   Output: max(pagg_tab.a), count(*), sum(pagg_tab.d)
+   ->  Append
+         ->  Foreign Scan
+               Output: (PARTIAL max(pagg_tab.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab.d))
+               Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+               Remote SQL: SELECT max(a), count(*), sum(d) FROM public.pagg_tab_p1
+         ->  Foreign Scan
+               Output: (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_1.d))
+               Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+               Remote SQL: SELECT max(a), count(*), sum(d) FROM public.pagg_tab_p2
+         ->  Foreign Scan
+               Output: (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_2.d))
+               Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+               Remote SQL: SELECT max(a), count(*), sum(d) FROM public.pagg_tab_p3
+(15 rows)
+
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+ max | count |  sum  
+-----+-------+-------
+  29 |  3000 | 58500
+(1 row)
+
+-- Shouldn't try to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(distinct b) FROM pagg_tab;
+                          QUERY PLAN                           
+---------------------------------------------------------------
+ Aggregate
+   Output: max(pagg_tab.a), count(DISTINCT pagg_tab.b)
+   ->  Append
+         ->  Foreign Scan on public.fpagg_tab_p1 pagg_tab_1
+               Output: pagg_tab_1.a, pagg_tab_1.b
+               Remote SQL: SELECT a, b FROM public.pagg_tab_p1
+         ->  Foreign Scan on public.fpagg_tab_p2 pagg_tab_2
+               Output: pagg_tab_2.a, pagg_tab_2.b
+               Remote SQL: SELECT a, b FROM public.pagg_tab_p2
+         ->  Foreign Scan on public.fpagg_tab_p3 pagg_tab_3
+               Output: pagg_tab_3.a, pagg_tab_3.b
+               Remote SQL: SELECT a, b FROM public.pagg_tab_p3
+(12 rows)
+
+SELECT max(a), count(distinct b) FROM pagg_tab;
+ max | count 
+-----+-------
+  29 |    50
+(1 row)
+
 -- ===================================================================
 -- access rights and superuser
 -- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 56654844e8f..6b373c85a4d 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,8 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_aggregate.h"
+#include "catalog/pg_proc.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -48,6 +50,7 @@
 #include "utils/rel.h"
 #include "utils/sampling.h"
 #include "utils/selfuncs.h"
+#include "utils/syscache.h"
 
 PG_MODULE_MAGIC;
 
@@ -80,7 +83,12 @@ enum FdwScanPrivateIndex
 	 * String describing join i.e. names of relations being joined and types
 	 * of join, added when the scan is join
 	 */
-	FdwScanPrivateRelations
+	FdwScanPrivateRelations,
+
+	/*
+	 * List of functions to convert partial aggregate result
+	 */
+	FdwScanPrivateConverters
 };
 
 /*
@@ -143,6 +151,7 @@ typedef struct PgFdwScanState
 	/* extracted fdw_private data */
 	char	   *query;			/* text of SELECT command */
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
+	List	   *conv_list;		/* list of converters */
 
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
@@ -474,6 +483,7 @@ static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
 static void finish_foreign_modify(PgFdwModifyState *fmstate);
 static void deallocate_query(PgFdwModifyState *fmstate);
+static List *build_conv_list(RelOptInfo *foreignrel);
 static List *build_remote_returning(Index rtindex, Relation rel,
 									List *returningList);
 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -510,6 +520,7 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 											Relation rel,
 											AttInMetadata *attinmeta,
 											List *retrieved_attrs,
+											List *conv_list,
 											ForeignScanState *fsstate,
 											MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
@@ -517,7 +528,7 @@ static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
 							JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
 							JoinPathExtraData *extra);
 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
-								Node *havingQual);
+								Node *havingQual, PartitionwiseAggregateType patype);
 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
 											  RelOptInfo *rel);
 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
@@ -541,7 +552,6 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -1233,6 +1243,7 @@ postgresGetForeignPlan(PlannerInfo *root,
 	List	   *local_exprs = NIL;
 	List	   *params_list = NIL;
 	List	   *fdw_scan_tlist = NIL;
+	List	   *fdw_conv_list = NIL;
 	List	   *fdw_recheck_quals = NIL;
 	List	   *retrieved_attrs;
 	StringInfoData sql;
@@ -1336,6 +1347,9 @@ postgresGetForeignPlan(PlannerInfo *root,
 		/* Build the list of columns to be fetched from the foreign server. */
 		fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
 
+		/* Build the list of converters for partial aggregates. */
+		fdw_conv_list = build_conv_list(foreignrel);
+
 		/*
 		 * Ensure that the outer plan produces a tuple whose descriptor
 		 * matches our scan tuple slot.  Also, remove the local conditions
@@ -1415,6 +1429,8 @@ postgresGetForeignPlan(PlannerInfo *root,
 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
 		fdw_private = lappend(fdw_private,
 							  makeString(fpinfo->relation_name));
+	if (IS_UPPER_REL(foreignrel))
+		fdw_private = lappend(fdw_private, fdw_conv_list);
 
 	/*
 	 * Create the ForeignScan node for the given relation.
@@ -1433,6 +1449,49 @@ postgresGetForeignPlan(PlannerInfo *root,
 							outer_plan);
 }
 
+/*
+ * Generate attinmeta if there are some converters:
+ * corresponding attribute type should be converter
+ * input type, not expected result type (BYTEA).
+ */
+static AttInMetadata *
+get_rcvd_attinmeta(TupleDesc tupdesc, List *conv_list)
+{
+	TupleDesc	rcvd_tupdesc;
+
+	Assert(conv_list != NIL);
+
+	rcvd_tupdesc = CreateTupleDescCopy(tupdesc);
+	for (int i = 0; i < rcvd_tupdesc->natts; i++)
+	{
+		Oid			converter = InvalidOid;
+		Form_pg_attribute att = TupleDescAttr(rcvd_tupdesc, i);
+
+		converter = list_nth_oid(conv_list, i);
+		if (converter != InvalidOid)
+		{
+			HeapTuple	proctup;
+			Form_pg_proc procform;
+
+			proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(converter));
+
+			if (!HeapTupleIsValid(proctup))
+				elog(ERROR, "cache lookup failed for function %u", converter);
+
+			procform = (Form_pg_proc) GETSTRUCT(proctup);
+
+			if (procform->pronargs != 1)
+				elog(ERROR, "converter %s is expected to have one argument", NameStr(procform->proname));
+
+			att->atttypid = procform->proargtypes.values[0];
+
+			ReleaseSysCache(proctup);
+		}
+	}
+
+	return TupleDescGetAttInMetadata(rcvd_tupdesc);
+}
+
 /*
  * Construct a tuple descriptor for the scan tuples handled by a foreign join.
  */
@@ -1545,6 +1604,12 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 									 FdwScanPrivateSelectSql));
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 												 FdwScanPrivateRetrievedAttrs);
+
+	if (list_length(fsplan->fdw_private) > FdwScanPrivateConverters)
+		fsstate->conv_list = (List *) list_nth(fsplan->fdw_private,
+											   FdwScanPrivateConverters);
+
+
 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
 										  FdwScanPrivateFetchSize));
 
@@ -1571,7 +1636,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
 	}
 
-	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
+	if (fsstate->conv_list != NIL)
+		fsstate->attinmeta = get_rcvd_attinmeta(fsstate->tupdesc, fsstate->conv_list);
+	else
+		fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
 
 	/*
 	 * Prepare for processing of parameters used in remote query, if any.
@@ -3834,6 +3902,7 @@ fetch_more_data(ForeignScanState *node)
 										   fsstate->rel,
 										   fsstate->attinmeta,
 										   fsstate->retrieved_attrs,
+										   fsstate->conv_list,
 										   node,
 										   fsstate->temp_cxt);
 		}
@@ -4321,6 +4390,7 @@ store_returning_result(PgFdwModifyState *fmstate,
 											fmstate->rel,
 											fmstate->attinmeta,
 											fmstate->retrieved_attrs,
+											NIL,
 											NULL,
 											fmstate->temp_cxt);
 
@@ -4615,6 +4685,7 @@ get_returning_data(ForeignScanState *node)
 												dmstate->rel,
 												dmstate->attinmeta,
 												dmstate->retrieved_attrs,
+												NIL,
 												node,
 												dmstate->temp_cxt);
 			ExecStoreHeapTuple(newtup, slot, false);
@@ -5195,6 +5266,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
 													   astate->rel,
 													   astate->attinmeta,
 													   astate->retrieved_attrs,
+													   NIL,
 													   NULL,
 													   astate->temp_cxt);
 
@@ -6095,7 +6167,7 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
  */
 static bool
 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
-					Node *havingQual)
+					Node *havingQual, PartitionwiseAggregateType patype)
 {
 	Query	   *query = root->parse;
 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
@@ -6109,6 +6181,11 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
 	if (query->groupingSets)
 		return false;
 
+
+	/* It's unsafe to push having statements with partial aggregates */
+	if ((patype == PARTITIONWISE_AGGREGATE_PARTIAL) && havingQual)
+		return false;
+
 	/* Get the fpinfo of the underlying scan relation. */
 	ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
 
@@ -6348,6 +6425,7 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
 
 	/* Ignore stages we don't support; and skip any duplicate calls. */
 	if ((stage != UPPERREL_GROUP_AGG &&
+		 stage != UPPERREL_PARTIAL_GROUP_AGG &&
 		 stage != UPPERREL_ORDERED &&
 		 stage != UPPERREL_FINAL) ||
 		output_rel->fdw_private)
@@ -6364,6 +6442,10 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
 			add_foreign_grouping_paths(root, input_rel, output_rel,
 									   (GroupPathExtraData *) extra);
 			break;
+		case UPPERREL_PARTIAL_GROUP_AGG:
+			add_foreign_grouping_paths(root, input_rel, output_rel,
+									   (GroupPathExtraData *) extra);
+			break;
 		case UPPERREL_ORDERED:
 			add_foreign_ordered_paths(root, input_rel, output_rel);
 			break;
@@ -6404,7 +6486,8 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
 		return;
 
 	Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
-		   extra->patype == PARTITIONWISE_AGGREGATE_FULL);
+		   extra->patype == PARTITIONWISE_AGGREGATE_FULL ||
+		   extra->patype == PARTITIONWISE_AGGREGATE_PARTIAL);
 
 	/* save the input_rel as outerrel in fpinfo */
 	fpinfo->outerrel = input_rel;
@@ -6424,7 +6507,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
 	 * Use HAVING qual from extra. In case of child partition, it will have
 	 * translated Vars.
 	 */
-	if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
+	if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual, extra->patype))
 		return;
 
 	/*
@@ -7122,6 +7205,30 @@ complete_pending_request(AsyncRequest *areq)
 							  TupIsNull(areq->result) ? 0.0 : 1.0);
 }
 
+/*
+ * Interface to fmgr to call converters
+ */
+static Datum
+call_converter(Oid converter, Oid collation, Datum value, bool isnull, bool *res_isnull)
+{
+	LOCAL_FCINFO(fcinfo, 1);
+	FmgrInfo	flinfo;
+	Datum		result;
+
+	fmgr_info(converter, &flinfo);
+
+	InitFunctionCallInfoData(*fcinfo, &flinfo, 1, collation, NULL, NULL);
+
+	fcinfo->args[0].value = value;
+	fcinfo->args[0].isnull = isnull;
+
+	result = FunctionCallInvoke(fcinfo);
+
+	if (res_isnull)
+		*res_isnull = fcinfo->isnull;
+	return result;
+}
+
 /*
  * Create a tuple from the specified row of the PGresult.
  *
@@ -7141,6 +7248,7 @@ make_tuple_from_result_row(PGresult *res,
 						   Relation rel,
 						   AttInMetadata *attinmeta,
 						   List *retrieved_attrs,
+						   List *conv_list,
 						   ForeignScanState *fsstate,
 						   MemoryContext temp_context)
 {
@@ -7223,6 +7331,20 @@ make_tuple_from_result_row(PGresult *res,
 											  valstr,
 											  attinmeta->attioparams[i - 1],
 											  attinmeta->atttypmods[i - 1]);
+			if (conv_list != NIL)
+			{
+				Oid			converter = list_nth_oid(conv_list, i - 1);
+
+				if (converter != InvalidOid)
+				{
+					Form_pg_attribute att = TupleDescAttr(tupdesc, i);
+					bool		res_isnull;
+
+					values[i - 1] = call_converter(converter, att->attcollation, values[i - 1], nulls[i - 1], &res_isnull);
+
+					nulls[i - 1] = res_isnull;
+				}
+			}
 		}
 		else if (i == SelfItemPointerAttributeNumber)
 		{
@@ -7486,3 +7608,54 @@ get_batch_size_option(Relation rel)
 
 	return batch_size;
 }
+
+/*
+ * For UPPER_REL build a list of converters, corresponding to tlist entries.
+ */
+static List *
+build_conv_list(RelOptInfo *foreignrel)
+{
+	List	   *conv_list = NIL;
+	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
+	ListCell   *lc;
+
+	if (!IS_UPPER_REL(foreignrel))
+		return NIL;
+
+	/* For UPPER_REL tlist matches grouped_tlist */
+	foreach(lc, fpinfo->grouped_tlist)
+	{
+		TargetEntry *tlentry = (TargetEntry *) lfirst(lc);
+		Oid			converter_oid = InvalidOid;
+
+		if (IsA(tlentry->expr, Aggref))
+		{
+			Aggref	   *agg = (Aggref *) tlentry->expr;
+
+			if (agg->aggsplit == AGGSPLIT_INITIAL_SERIAL && agg->aggtranstype == INTERNALOID)
+			{
+				HeapTuple	aggtup;
+				Form_pg_aggregate aggform;
+
+				aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(agg->aggfnoid));
+				if (!HeapTupleIsValid(aggtup))
+					elog(ERROR, "cache lookup failed for function %u", agg->aggfnoid);
+
+				aggform = (Form_pg_aggregate) GETSTRUCT(aggtup);
+
+				converter_oid = aggform->aggpartialconverterfn;
+				Assert(converter_oid != InvalidOid);
+
+				ReleaseSysCache(aggtup);
+			}
+		}
+
+		/*
+		 * We append InvalidOid to conv_list to preserve one-to-one mapping
+		 * between tlist and conv_list members.
+		 */
+		conv_list = lappend_oid(conv_list, converter_oid);
+	}
+
+	return conv_list;
+}
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 95b6b7192e6..88210452e3f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2755,15 +2755,15 @@ RESET enable_partitionwise_join;
 -- test partitionwise aggregates
 -- ===================================================================
 
-CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a);
+CREATE TABLE pagg_tab (a int, b int, c text, d numeric) PARTITION BY RANGE(a);
 
 CREATE TABLE pagg_tab_p1 (LIKE pagg_tab);
 CREATE TABLE pagg_tab_p2 (LIKE pagg_tab);
 CREATE TABLE pagg_tab_p3 (LIKE pagg_tab);
 
-INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
-INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
-INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
+INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
+INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
+INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
 
 -- Create foreign partitions
 CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1');
@@ -2797,6 +2797,25 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
 EXPLAIN (COSTS OFF)
 SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
 
+-- It's unsafe to push down having clause when there are partial aggregates
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+
+-- Partial aggregates are fine to push down without having clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+
+-- Partial aggregates are fine to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+
+-- Shouldn't try to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(distinct b) FROM pagg_tab;
+SELECT max(a), count(distinct b) FROM pagg_tab;
+
 -- ===================================================================
 -- access rights and superuser
 -- ===================================================================
diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml
index 222e0aa5c9d..b0546dcb1d2 100644
--- a/doc/src/sgml/ref/create_aggregate.sgml
+++ b/doc/src/sgml/ref/create_aggregate.sgml
@@ -31,6 +31,7 @@ CREATE [ OR REPLACE ] AGGREGATE <replaceable class="parameter">name</replaceable
     [ , COMBINEFUNC = <replaceable class="parameter">combinefunc</replaceable> ]
     [ , SERIALFUNC = <replaceable class="parameter">serialfunc</replaceable> ]
     [ , DESERIALFUNC = <replaceable class="parameter">deserialfunc</replaceable> ]
+    [ , PARTIALCONVERTERFUNC = <replaceable class="parameter">partialconverterfunc</replaceable> ]
     [ , INITCOND = <replaceable class="parameter">initial_condition</replaceable> ]
     [ , MSFUNC = <replaceable class="parameter">msfunc</replaceable> ]
     [ , MINVFUNC = <replaceable class="parameter">minvfunc</replaceable> ]
@@ -42,6 +43,7 @@ CREATE [ OR REPLACE ] AGGREGATE <replaceable class="parameter">name</replaceable
     [ , MINITCOND = <replaceable class="parameter">minitial_condition</replaceable> ]
     [ , SORTOP = <replaceable class="parameter">sort_operator</replaceable> ]
     [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]
+    [ , PARTIAL_PUSHDOWN_SAFE ]
 )
 
 CREATE [ OR REPLACE ] AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replaceable class="parameter">argmode</replaceable> ] [ <replaceable class="parameter">argname</replaceable> ] <replaceable class="parameter">arg_data_type</replaceable> [ , ... ] ]
@@ -610,6 +612,21 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">partialconverterfunc</replaceable></term>
+    <listitem>
+     <para>
+      An aggregate function whose <replaceable class="parameter">state_data_type</replaceable>
+      is <type>internal</type> can be marked as <replaceable class="parameter">PARTIAL_PUSHDOWN_SAFE</replaceable>
+      only if it has a <replaceable class="parameter">partialconverterfunc</replaceable> function,
+      which must convert partial result representation from aggregate result type
+      to serialized internal representation. When this function is defined,
+      it should get one argument, type of which matches aggrregate
+      result type, and should return <type>bytea</type>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">sort_operator</replaceable></term>
     <listitem>
@@ -639,6 +656,16 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>PARTIAL_PUSHDOWN_SAFE</literal></term>
+    <listitem>
+     <para>
+      This flag specifies that partial aggregate computation can be delegated
+      to foreign server.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>HYPOTHETICAL</literal></term>
     <listitem>
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index 0d0daa69b34..521794cfbea 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -60,6 +60,7 @@ AggregateCreate(const char *aggName,
 				List *aggcombinefnName,
 				List *aggserialfnName,
 				List *aggdeserialfnName,
+				List *aggpartialconverterfnName,
 				List *aggmtransfnName,
 				List *aggminvtransfnName,
 				List *aggmfinalfnName,
@@ -74,7 +75,8 @@ AggregateCreate(const char *aggName,
 				int32 aggmTransSpace,
 				const char *agginitval,
 				const char *aggminitval,
-				char proparallel)
+				char proparallel,
+				bool partialPushdownSafe)
 {
 	Relation	aggdesc;
 	HeapTuple	tup;
@@ -88,6 +90,7 @@ AggregateCreate(const char *aggName,
 	Oid			combinefn = InvalidOid; /* can be omitted */
 	Oid			serialfn = InvalidOid;	/* can be omitted */
 	Oid			deserialfn = InvalidOid;	/* can be omitted */
+	Oid			partialconverterfn = InvalidOid;	/* can be omitted */
 	Oid			mtransfn = InvalidOid;	/* can be omitted */
 	Oid			minvtransfn = InvalidOid;	/* can be omitted */
 	Oid			mfinalfn = InvalidOid;	/* can be omitted */
@@ -569,6 +572,27 @@ AggregateCreate(const char *aggName,
 							format_type_be(finaltype))));
 	}
 
+	/*
+	 * Validate the partial converter, if present.
+	 */
+	if (aggpartialconverterfnName)
+	{
+		fnArgs[0] = finaltype;
+
+		partialconverterfn = lookup_agg_function(aggpartialconverterfnName, 1,
+											  fnArgs, InvalidOid,
+											  &rettype);
+
+		if (rettype != BYTEAOID)
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("return type of partial serialization function %s is not %s",
+							NameListToString(aggserialfnName),
+							format_type_be(BYTEAOID))));
+
+	}
+
+
 	/* handle sortop, if supplied */
 	if (aggsortopName)
 	{
@@ -664,6 +688,7 @@ AggregateCreate(const char *aggName,
 	values[Anum_pg_aggregate_aggcombinefn - 1] = ObjectIdGetDatum(combinefn);
 	values[Anum_pg_aggregate_aggserialfn - 1] = ObjectIdGetDatum(serialfn);
 	values[Anum_pg_aggregate_aggdeserialfn - 1] = ObjectIdGetDatum(deserialfn);
+	values[Anum_pg_aggregate_aggpartialconverterfn - 1] = ObjectIdGetDatum(partialconverterfn);
 	values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
 	values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
 	values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
@@ -676,6 +701,7 @@ AggregateCreate(const char *aggName,
 	values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
 	values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType);
 	values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace);
+	values[Anum_pg_aggregate_aggpartialpushdownsafe - 1] = BoolGetDatum(partialPushdownSafe);
 	if (agginitval)
 		values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval);
 	else
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index 010eca7340a..414fc09a263 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -69,11 +69,13 @@ DefineAggregate(ParseState *pstate,
 	List	   *combinefuncName = NIL;
 	List	   *serialfuncName = NIL;
 	List	   *deserialfuncName = NIL;
+	List	   *partialconverterfuncName = NIL;
 	List	   *mtransfuncName = NIL;
 	List	   *minvtransfuncName = NIL;
 	List	   *mfinalfuncName = NIL;
 	bool		finalfuncExtraArgs = false;
 	bool		mfinalfuncExtraArgs = false;
+	bool		partialPushdownSafe = false;
 	char		finalfuncModify = 0;
 	char		mfinalfuncModify = 0;
 	List	   *sortoperatorName = NIL;
@@ -142,6 +144,8 @@ DefineAggregate(ParseState *pstate,
 			serialfuncName = defGetQualifiedName(defel);
 		else if (strcmp(defel->defname, "deserialfunc") == 0)
 			deserialfuncName = defGetQualifiedName(defel);
+		else if (strcmp(defel->defname, "partialconverterfunc") == 0)
+			partialconverterfuncName = defGetQualifiedName(defel);
 		else if (strcmp(defel->defname, "msfunc") == 0)
 			mtransfuncName = defGetQualifiedName(defel);
 		else if (strcmp(defel->defname, "minvfunc") == 0)
@@ -189,6 +193,8 @@ DefineAggregate(ParseState *pstate,
 			minitval = defGetString(defel);
 		else if (strcmp(defel->defname, "parallel") == 0)
 			parallel = defGetString(defel);
+		else if (strcmp(defel->defname, "partial_pushdown_safe") == 0)
+			partialPushdownSafe = defGetBoolean(defel);
 		else
 			ereport(WARNING,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -372,6 +378,18 @@ DefineAggregate(ParseState *pstate,
 				 errmsg("must specify both or neither of serialization and deserialization functions")));
 	}
 
+	if (partialconverterfuncName)
+	{
+		/*
+		 * Converter is only needed/allowed for transtype INTERNAL.
+		 */
+		if (transTypeId != INTERNALOID)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+					 errmsg("partial converters may be specified only when the aggregate transition data type is %s",
+							format_type_be(INTERNALOID))));
+	}
+
 	/*
 	 * If a moving-aggregate transtype is specified, look that up.  Same
 	 * restrictions as for transtype.
@@ -457,6 +475,8 @@ DefineAggregate(ParseState *pstate,
 						   combinefuncName, /* combine function name */
 						   serialfuncName,	/* serial function name */
 						   deserialfuncName,	/* deserial function name */
+						   partialconverterfuncName,	/* partial converter function
+													 * name */
 						   mtransfuncName,	/* fwd trans function name */
 						   minvtransfuncName,	/* inv trans function name */
 						   mfinalfuncName,	/* final function name */
@@ -471,7 +491,8 @@ DefineAggregate(ParseState *pstate,
 						   mtransSpace, /* transition space */
 						   initval, /* initial condition */
 						   minitval,	/* initial condition */
-						   proparallel);	/* parallel safe? */
+						   proparallel, /* parallel safe? */
+						   partialPushdownSafe);	/* partial pushdown safe? */
 }
 
 /*
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 45547f6ae7f..398db29170b 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -5771,6 +5771,52 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(result);
 }
 
+/*
+ * int8_sum_to_internal_serialize
+ *		Convert int8 argument to serialized internal representation
+ */
+Datum
+int8_sum_to_internal_serialize(PG_FUNCTION_ARGS)
+{
+	PolyNumAggState *state = NULL;
+	StringInfoData buf;
+	bytea	   *result;
+	NumericVar	tmp_var;
+
+	state = (PolyNumAggState *) palloc0(sizeof(PolyNumAggState));
+	state->calcSumX2 = false;
+
+	if (!PG_ARGISNULL(0))
+	{
+#ifdef HAVE_INT128
+		do_int128_accum(state, (int128) PG_GETARG_INT64(0));
+#else
+		do_numeric_accum(state, int64_to_numeric(PG_GETARG_INT64(0)));
+#endif
+	}
+
+	init_var(&tmp_var);
+
+	pq_begintypsend(&buf);
+
+	/* N */
+	pq_sendint64(&buf, state->N);
+
+	/* sumX */
+#ifdef HAVE_INT128
+	int128_to_numericvar(state->sumX, &tmp_var);
+#else
+	accum_sum_final(&state->sumX, &tmp_var);
+#endif
+	numericvar_serialize(&buf, &tmp_var);
+
+	result = pq_endtypsend(&buf);
+
+	free_var(&tmp_var);
+
+	PG_RETURN_BYTEA_P(result);
+}
+
 /*
  * Inverse transition functions to go with the above.
  */
@@ -5996,6 +6042,56 @@ numeric_sum(PG_FUNCTION_ARGS)
 	PG_RETURN_NUMERIC(result);
 }
 
+/*
+ * numeric_sum_to_internal_serialize
+ *		Convert numeric argument to serialized internal representation
+ */
+Datum
+numeric_sum_to_internal_serialize(PG_FUNCTION_ARGS)
+{
+	NumericAggState *state = NULL;
+	StringInfoData buf;
+	bytea	   *result;
+	NumericVar	tmp_var;
+
+	state = makeNumericAggStateCurrentContext(false);
+
+	if (!PG_ARGISNULL(0))
+		do_numeric_accum(state, PG_GETARG_NUMERIC(0));
+
+	init_var(&tmp_var);
+
+	pq_begintypsend(&buf);
+
+	/* N */
+	pq_sendint64(&buf, state->N);
+
+	/* sumX */
+	accum_sum_final(&state->sumX, &tmp_var);
+	numericvar_serialize(&buf, &tmp_var);
+
+	/* maxScale */
+	pq_sendint32(&buf, state->maxScale);
+
+	/* maxScaleCount */
+	pq_sendint64(&buf, state->maxScaleCount);
+
+	/* NaNcount */
+	pq_sendint64(&buf, state->NaNcount);
+
+	/* pInfcount */
+	pq_sendint64(&buf, state->pInfcount);
+
+	/* nInfcount */
+	pq_sendint64(&buf, state->nInfcount);
+
+	result = pq_endtypsend(&buf);
+
+	free_var(&tmp_var);
+
+	PG_RETURN_BYTEA_P(result);
+}
+
 /*
  * Workhorse routine for the standard deviance and variance
  * aggregates. 'state' is aggregate's transition state.
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e5816c4ccea..c8e93967222 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -13246,11 +13246,13 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
 	const char *aggcombinefn;
 	const char *aggserialfn;
 	const char *aggdeserialfn;
+	const char *aggpartialconverterfn;
 	const char *aggmtransfn;
 	const char *aggminvtransfn;
 	const char *aggmfinalfn;
 	bool		aggfinalextra;
 	bool		aggmfinalextra;
+	bool		aggpartialpushdownsafe;
 	char		aggfinalmodify;
 	char		aggmfinalmodify;
 	const char *aggsortop;
@@ -13331,11 +13333,19 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
 		if (fout->remoteVersion >= 110000)
 			appendPQExpBufferStr(query,
 								 "aggfinalmodify,\n"
-								 "aggmfinalmodify\n");
+								 "aggmfinalmodify,\n");
 		else
 			appendPQExpBufferStr(query,
 								 "'0' AS aggfinalmodify,\n"
-								 "'0' AS aggmfinalmodify\n");
+								 "'0' AS aggmfinalmodify,\n");
+		if (fout->remoteVersion >= 150000)
+			appendPQExpBufferStr(query,
+								 "aggpartialconverterfn,\n"
+								 "aggpartialpushdownsafe\n");
+		else
+			appendPQExpBufferStr(query,
+								 "'-' AS aggpartialconverterfn,\n"
+								 "false as aggpartialpushdownsafe\n");
 
 		appendPQExpBufferStr(query,
 							 "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
@@ -13361,6 +13371,8 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
 	aggcombinefn = PQgetvalue(res, 0, PQfnumber(res, "aggcombinefn"));
 	aggserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggserialfn"));
 	aggdeserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggdeserialfn"));
+	aggpartialconverterfn = PQgetvalue(res, 0, PQfnumber(res, "aggpartialconverterfn"));
+	aggpartialpushdownsafe = (PQgetvalue(res, 0, PQfnumber(res, "aggpartialpushdownsafe"))[0] == 't');
 	aggmtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggmtransfn"));
 	aggminvtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggminvtransfn"));
 	aggmfinalfn = PQgetvalue(res, 0, PQfnumber(res, "aggmfinalfn"));
@@ -13450,6 +13462,10 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
 	if (strcmp(aggdeserialfn, "-") != 0)
 		appendPQExpBuffer(details, ",\n    DESERIALFUNC = %s", aggdeserialfn);
 
+	if (strcmp(aggpartialconverterfn, "-") != 0)
+		appendPQExpBuffer(details, ",\n    PARTIALCONVERTERFUNC = %s", aggpartialconverterfn);
+	if (aggpartialpushdownsafe)
+		appendPQExpBufferStr(details, ",\n    PARTIAL_PUSHDOWN_SAFE");
 	if (strcmp(aggmtransfn, "-") != 0)
 	{
 		appendPQExpBuffer(details, ",\n    MSFUNC = %s,\n    MINVFUNC = %s,\n    MSTYPE = %s",
diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat
index 2843f4b4159..86c77f778a5 100644
--- a/src/include/catalog/pg_aggregate.dat
+++ b/src/include/catalog/pg_aggregate.dat
@@ -56,26 +56,27 @@
   aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
   aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv',
   aggmfinalfn => 'numeric_poly_sum', aggtranstype => 'internal',
-  aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' },
+  aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48',
+  aggpartialconverterfn => 'int8_sum_to_internal_serialize', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(int4)', aggtransfn => 'int4_sum', aggcombinefn => 'int8pl',
   aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv',
   aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
-  aggmtranstype => '_int8', aggminitval => '{0,0}' },
+  aggmtranstype => '_int8', aggminitval => '{0,0}', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(int2)', aggtransfn => 'int2_sum', aggcombinefn => 'int8pl',
   aggmtransfn => 'int2_avg_accum', aggminvtransfn => 'int2_avg_accum_inv',
   aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
-  aggmtranstype => '_int8', aggminitval => '{0,0}' },
+  aggmtranstype => '_int8', aggminitval => '{0,0}', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(float4)', aggtransfn => 'float4pl',
-  aggcombinefn => 'float4pl', aggtranstype => 'float4' },
+  aggcombinefn => 'float4pl', aggtranstype => 'float4', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(float8)', aggtransfn => 'float8pl',
-  aggcombinefn => 'float8pl', aggtranstype => 'float8' },
+  aggcombinefn => 'float8pl', aggtranstype => 'float8', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(money)', aggtransfn => 'cash_pl', aggcombinefn => 'cash_pl',
   aggmtransfn => 'cash_pl', aggminvtransfn => 'cash_mi',
-  aggtranstype => 'money', aggmtranstype => 'money' },
+  aggtranstype => 'money', aggmtranstype => 'money', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(interval)', aggtransfn => 'interval_pl',
   aggcombinefn => 'interval_pl', aggmtransfn => 'interval_pl',
   aggminvtransfn => 'interval_mi', aggtranstype => 'interval',
-  aggmtranstype => 'interval' },
+  aggmtranstype => 'interval', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'sum(numeric)', aggtransfn => 'numeric_avg_accum',
   aggfinalfn => 'numeric_sum', aggcombinefn => 'numeric_avg_combine',
   aggserialfn => 'numeric_avg_serialize',
@@ -83,152 +84,155 @@
   aggmtransfn => 'numeric_avg_accum', aggminvtransfn => 'numeric_accum_inv',
   aggmfinalfn => 'numeric_sum', aggtranstype => 'internal',
   aggtransspace => '128', aggmtranstype => 'internal',
-  aggmtransspace => '128' },
+  aggmtransspace => '128', aggpartialconverterfn => 'numeric_sum_to_internal_serialize', aggpartialpushdownsafe => 't' },
 
 # max
 { aggfnoid => 'max(int8)', aggtransfn => 'int8larger',
   aggcombinefn => 'int8larger', aggsortop => '>(int8,int8)',
-  aggtranstype => 'int8' },
+  aggtranstype => 'int8', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(int4)', aggtransfn => 'int4larger',
   aggcombinefn => 'int4larger', aggsortop => '>(int4,int4)',
-  aggtranstype => 'int4' },
+  aggtranstype => 'int4', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(int2)', aggtransfn => 'int2larger',
   aggcombinefn => 'int2larger', aggsortop => '>(int2,int2)',
-  aggtranstype => 'int2' },
+  aggtranstype => 'int2', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(oid)', aggtransfn => 'oidlarger',
   aggcombinefn => 'oidlarger', aggsortop => '>(oid,oid)',
-  aggtranstype => 'oid' },
+  aggtranstype => 'oid', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(float4)', aggtransfn => 'float4larger',
   aggcombinefn => 'float4larger', aggsortop => '>(float4,float4)',
-  aggtranstype => 'float4' },
+  aggtranstype => 'float4', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(float8)', aggtransfn => 'float8larger',
   aggcombinefn => 'float8larger', aggsortop => '>(float8,float8)',
-  aggtranstype => 'float8' },
+  aggtranstype => 'float8', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(date)', aggtransfn => 'date_larger',
   aggcombinefn => 'date_larger', aggsortop => '>(date,date)',
-  aggtranstype => 'date' },
+  aggtranstype => 'date', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(time)', aggtransfn => 'time_larger',
   aggcombinefn => 'time_larger', aggsortop => '>(time,time)',
-  aggtranstype => 'time' },
+  aggtranstype => 'time', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(timetz)', aggtransfn => 'timetz_larger',
   aggcombinefn => 'timetz_larger', aggsortop => '>(timetz,timetz)',
-  aggtranstype => 'timetz' },
+  aggtranstype => 'timetz', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(money)', aggtransfn => 'cashlarger',
   aggcombinefn => 'cashlarger', aggsortop => '>(money,money)',
-  aggtranstype => 'money' },
+  aggtranstype => 'money', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(timestamp)', aggtransfn => 'timestamp_larger',
   aggcombinefn => 'timestamp_larger', aggsortop => '>(timestamp,timestamp)',
-  aggtranstype => 'timestamp' },
+  aggtranstype => 'timestamp', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(timestamptz)', aggtransfn => 'timestamptz_larger',
   aggcombinefn => 'timestamptz_larger',
   aggsortop => '>(timestamptz,timestamptz)', aggtranstype => 'timestamptz' },
 { aggfnoid => 'max(interval)', aggtransfn => 'interval_larger',
   aggcombinefn => 'interval_larger', aggsortop => '>(interval,interval)',
-  aggtranstype => 'interval' },
+  aggtranstype => 'interval', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(text)', aggtransfn => 'text_larger',
   aggcombinefn => 'text_larger', aggsortop => '>(text,text)',
-  aggtranstype => 'text' },
+  aggtranstype => 'text', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(numeric)', aggtransfn => 'numeric_larger',
   aggcombinefn => 'numeric_larger', aggsortop => '>(numeric,numeric)',
-  aggtranstype => 'numeric' },
+  aggtranstype => 'numeric', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(anyarray)', aggtransfn => 'array_larger',
   aggcombinefn => 'array_larger', aggsortop => '>(anyarray,anyarray)',
-  aggtranstype => 'anyarray' },
+  aggtranstype => 'anyarray', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(bpchar)', aggtransfn => 'bpchar_larger',
   aggcombinefn => 'bpchar_larger', aggsortop => '>(bpchar,bpchar)',
-  aggtranstype => 'bpchar' },
+  aggtranstype => 'bpchar', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(tid)', aggtransfn => 'tidlarger',
   aggcombinefn => 'tidlarger', aggsortop => '>(tid,tid)',
-  aggtranstype => 'tid' },
+  aggtranstype => 'tid', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(anyenum)', aggtransfn => 'enum_larger',
   aggcombinefn => 'enum_larger', aggsortop => '>(anyenum,anyenum)',
-  aggtranstype => 'anyenum' },
+  aggtranstype => 'anyenum', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(inet)', aggtransfn => 'network_larger',
   aggcombinefn => 'network_larger', aggsortop => '>(inet,inet)',
-  aggtranstype => 'inet' },
+  aggtranstype => 'inet', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(pg_lsn)', aggtransfn => 'pg_lsn_larger',
   aggcombinefn => 'pg_lsn_larger', aggsortop => '>(pg_lsn,pg_lsn)',
-  aggtranstype => 'pg_lsn' },
+  aggtranstype => 'pg_lsn', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'max(xid8)', aggtransfn => 'xid8_larger',
   aggcombinefn => 'xid8_larger', aggsortop => '>(xid8,xid8)',
-  aggtranstype => 'xid8' },
+  aggtranstype => 'xid8', aggpartialpushdownsafe => 't' },
 
 # min
 { aggfnoid => 'min(int8)', aggtransfn => 'int8smaller',
   aggcombinefn => 'int8smaller', aggsortop => '<(int8,int8)',
-  aggtranstype => 'int8' },
+  aggtranstype => 'int8', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(int4)', aggtransfn => 'int4smaller',
   aggcombinefn => 'int4smaller', aggsortop => '<(int4,int4)',
-  aggtranstype => 'int4' },
+  aggtranstype => 'int4', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(int2)', aggtransfn => 'int2smaller',
   aggcombinefn => 'int2smaller', aggsortop => '<(int2,int2)',
-  aggtranstype => 'int2' },
+  aggtranstype => 'int2', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(oid)', aggtransfn => 'oidsmaller',
   aggcombinefn => 'oidsmaller', aggsortop => '<(oid,oid)',
-  aggtranstype => 'oid' },
+  aggtranstype => 'oid', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(float4)', aggtransfn => 'float4smaller',
   aggcombinefn => 'float4smaller', aggsortop => '<(float4,float4)',
-  aggtranstype => 'float4' },
+  aggtranstype => 'float4', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(float8)', aggtransfn => 'float8smaller',
   aggcombinefn => 'float8smaller', aggsortop => '<(float8,float8)',
-  aggtranstype => 'float8' },
+  aggtranstype => 'float8', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(date)', aggtransfn => 'date_smaller',
   aggcombinefn => 'date_smaller', aggsortop => '<(date,date)',
-  aggtranstype => 'date' },
+  aggtranstype => 'date', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(time)', aggtransfn => 'time_smaller',
   aggcombinefn => 'time_smaller', aggsortop => '<(time,time)',
-  aggtranstype => 'time' },
+  aggtranstype => 'time', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(timetz)', aggtransfn => 'timetz_smaller',
   aggcombinefn => 'timetz_smaller', aggsortop => '<(timetz,timetz)',
-  aggtranstype => 'timetz' },
+  aggtranstype => 'timetz', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(money)', aggtransfn => 'cashsmaller',
   aggcombinefn => 'cashsmaller', aggsortop => '<(money,money)',
-  aggtranstype => 'money' },
+  aggtranstype => 'money', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(timestamp)', aggtransfn => 'timestamp_smaller',
   aggcombinefn => 'timestamp_smaller', aggsortop => '<(timestamp,timestamp)',
-  aggtranstype => 'timestamp' },
+  aggtranstype => 'timestamp', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(timestamptz)', aggtransfn => 'timestamptz_smaller',
   aggcombinefn => 'timestamptz_smaller',
-  aggsortop => '<(timestamptz,timestamptz)', aggtranstype => 'timestamptz' },
+  aggsortop => '<(timestamptz,timestamptz)', aggtranstype => 'timestamptz',
+  aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(interval)', aggtransfn => 'interval_smaller',
   aggcombinefn => 'interval_smaller', aggsortop => '<(interval,interval)',
-  aggtranstype => 'interval' },
+  aggtranstype => 'interval', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(text)', aggtransfn => 'text_smaller',
   aggcombinefn => 'text_smaller', aggsortop => '<(text,text)',
-  aggtranstype => 'text' },
+  aggtranstype => 'text', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(numeric)', aggtransfn => 'numeric_smaller',
   aggcombinefn => 'numeric_smaller', aggsortop => '<(numeric,numeric)',
-  aggtranstype => 'numeric' },
+  aggtranstype => 'numeric', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(anyarray)', aggtransfn => 'array_smaller',
   aggcombinefn => 'array_smaller', aggsortop => '<(anyarray,anyarray)',
-  aggtranstype => 'anyarray' },
+  aggtranstype => 'anyarray', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(bpchar)', aggtransfn => 'bpchar_smaller',
   aggcombinefn => 'bpchar_smaller', aggsortop => '<(bpchar,bpchar)',
-  aggtranstype => 'bpchar' },
+  aggtranstype => 'bpchar', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(tid)', aggtransfn => 'tidsmaller',
   aggcombinefn => 'tidsmaller', aggsortop => '<(tid,tid)',
-  aggtranstype => 'tid' },
+  aggtranstype => 'tid', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(anyenum)', aggtransfn => 'enum_smaller',
   aggcombinefn => 'enum_smaller', aggsortop => '<(anyenum,anyenum)',
-  aggtranstype => 'anyenum' },
+  aggtranstype => 'anyenum', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(inet)', aggtransfn => 'network_smaller',
   aggcombinefn => 'network_smaller', aggsortop => '<(inet,inet)',
-  aggtranstype => 'inet' },
+  aggtranstype => 'inet', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(pg_lsn)', aggtransfn => 'pg_lsn_smaller',
   aggcombinefn => 'pg_lsn_smaller', aggsortop => '<(pg_lsn,pg_lsn)',
-  aggtranstype => 'pg_lsn' },
+  aggtranstype => 'pg_lsn', aggpartialpushdownsafe => 't' },
 { aggfnoid => 'min(xid8)', aggtransfn => 'xid8_smaller',
   aggcombinefn => 'xid8_smaller', aggsortop => '<(xid8,xid8)',
-  aggtranstype => 'xid8' },
+  aggtranstype => 'xid8', aggpartialpushdownsafe => 't' },
 
 # count
 { aggfnoid => 'count(any)', aggtransfn => 'int8inc_any',
   aggcombinefn => 'int8pl', aggmtransfn => 'int8inc_any',
   aggminvtransfn => 'int8dec_any', aggtranstype => 'int8',
-  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0' },
+  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0',
+  aggpartialpushdownsafe => 't' },
 { aggfnoid => 'count()', aggtransfn => 'int8inc', aggcombinefn => 'int8pl',
   aggmtransfn => 'int8inc', aggminvtransfn => 'int8dec', aggtranstype => 'int8',
-  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0' },
+  aggmtranstype => 'int8', agginitval => '0', aggminitval => '0',
+  aggpartialpushdownsafe => 't' },
 
 # var_pop
 { aggfnoid => 'var_pop(int8)', aggtransfn => 'int8_accum',
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 593da9f76a8..3b8e541532d 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -55,6 +55,9 @@ CATALOG(pg_aggregate,2600,AggregateRelationId)
 	/* function to convert bytea to transtype (0 if none) */
 	regproc		aggdeserialfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
 
+	/* function to convert aggregate result to bytea (0 if none) */
+	regproc		aggpartialconverterfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
+
 	/* forward function for moving-aggregate mode (0 if none) */
 	regproc		aggmtransfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
 
@@ -91,6 +94,9 @@ CATALOG(pg_aggregate,2600,AggregateRelationId)
 	/* estimated size of moving-agg state (0 for default est) */
 	int32		aggmtransspace BKI_DEFAULT(0);
 
+	/* true if partial aggregate is fine to push down */
+	bool		aggpartialpushdownsafe BKI_DEFAULT(f);
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 
 	/* initial value for transition state (can be NULL) */
@@ -161,6 +167,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
 									 List *aggcombinefnName,
 									 List *aggserialfnName,
 									 List *aggdeserialfnName,
+									 List *aggpartialconverterfnName,
 									 List *aggmtransfnName,
 									 List *aggminvtransfnName,
 									 List *aggmfinalfnName,
@@ -175,6 +182,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
 									 int32 aggmTransSpace,
 									 const char *agginitval,
 									 const char *aggminitval,
-									 char proparallel);
+									 char proparallel,
+									 bool partialPushdownSafe);
 
 #endif							/* PG_AGGREGATE_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d8e8715ed1c..3450b526b14 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4790,6 +4790,12 @@
 { oid => '2786', descr => 'aggregate serial function',
   proname => 'int8_avg_serialize', prorettype => 'bytea',
   proargtypes => 'internal', prosrc => 'int8_avg_serialize' },
+{ oid => '9630', descr => 'partial aggregate converter function',
+  proname => 'int8_sum_to_internal_serialize', prorettype => 'bytea',
+  proargtypes => 'int8', prosrc => 'int8_sum_to_internal_serialize' },
+{ oid => '9631', descr => 'partial aggregate converter function',
+  proname => 'numeric_sum_to_internal_serialize', prorettype => 'bytea',
+  proargtypes => 'numeric', prosrc => 'numeric_sum_to_internal_serialize' },
 { oid => '2787', descr => 'aggregate deserial function',
   proname => 'int8_avg_deserialize', prorettype => 'internal',
   proargtypes => 'bytea internal', prosrc => 'int8_avg_deserialize' },
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be3..864151bbca3 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -145,6 +145,7 @@ NOTICE:  checking pg_aggregate {aggfinalfn} => pg_proc {oid}
 NOTICE:  checking pg_aggregate {aggcombinefn} => pg_proc {oid}
 NOTICE:  checking pg_aggregate {aggserialfn} => pg_proc {oid}
 NOTICE:  checking pg_aggregate {aggdeserialfn} => pg_proc {oid}
+NOTICE:  checking pg_aggregate {aggpartialconverterfn} => pg_proc {oid}
 NOTICE:  checking pg_aggregate {aggmtransfn} => pg_proc {oid}
 NOTICE:  checking pg_aggregate {aggminvtransfn} => pg_proc {oid}
 NOTICE:  checking pg_aggregate {aggmfinalfn} => pg_proc {oid}
-- 
2.25.1

Reply via email to