On Wed, 20 Jun 2018 at 19:08, David Rowley <david.row...@2ndquadrant.com> wrote:
> I'll submit it again when there more consensus that we want this.

Waking up this old thread again. If you don't have a copy, the entire
thread is in [1].

The remaining item that seemed to cause this patch to be rejected was
raised in [2]. The summary of that was that it might not be a good
idea to allow parallel aggregation of string_agg() and array_agg() as
there might be some people who rely on the current ordering they get
without having an ORDER BY clause in the aggregate function call.  Tom
mentioned in [3] that users might not want to add an ORDER BY to their
aggregate function because the performance of it is terrible.  That
was true up until 1349d2790 [4], where I changed how ORDER BY /
DISTINCT aggregation worked to allow the planner to provide pre-sorted
input rather than always having nodeAgg.c do the sorting.  I think
this removes quite a lot of the argument against the patch, but not
all of it.  So here goes testing the water on seeing if any opinions
have changed over the past few years?

A rebased patch is attached.

David

[1] 
https://www.postgresql.org/message-id/flat/CAKJS1f98yPkRMsE0JnDh72%3DAQEUuE3atiCJtPVCtjhFwzCRJHQ%40mail.gmail.com#8bbce15b9279d2da2da99071f732a99d
[2] https://www.postgresql.org/message-id/6538.1522096...@sss.pgh.pa.us
[3] https://www.postgresql.org/message-id/18594.1522099...@sss.pgh.pa.us
[4] 
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=1349d2790bf48a4de072931c722f39337e72055e
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 053d4dc650..d500cf151b 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -20651,7 +20651,7 @@ SELECT NULLIF(value, '(none)') ...
        <para>
         Collects all the input values, including nulls, into an array.
        </para></entry>
-       <entry>No</entry>
+       <entry>Yes</entry>
       </row>
 
       <row>
@@ -20664,7 +20664,7 @@ SELECT NULLIF(value, '(none)') ...
         dimension.  (The inputs must all have the same dimensionality, and
         cannot be empty or null.)
        </para></entry>
-       <entry>No</entry>
+       <entry>Yes</entry>
       </row>
 
       <row>
@@ -21115,7 +21115,7 @@ SELECT NULLIF(value, '(none)') ...
         after the first is preceded by the
         corresponding <parameter>delimiter</parameter> (if it's not null).
        </para></entry>
-       <entry>No</entry>
+       <entry>Yes</entry>
       </row>
 
       <row>
diff --git a/src/backend/optimizer/prep/prepagg.c 
b/src/backend/optimizer/prep/prepagg.c
index da89b55402..374d0dda6b 100644
--- a/src/backend/optimizer/prep/prepagg.c
+++ b/src/backend/optimizer/prep/prepagg.c
@@ -305,10 +305,30 @@ preprocess_aggref(Aggref *aggref, PlannerInfo *root)
                                 * functions; if not, we can't serialize 
partial-aggregation
                                 * results.
                                 */
-                               else if (transinfo->aggtranstype == INTERNALOID 
&&
-                                                
(!OidIsValid(transinfo->serialfn_oid) ||
-                                                 
!OidIsValid(transinfo->deserialfn_oid)))
-                                       root->hasNonSerialAggs = true;
+                               else if (transinfo->aggtranstype == INTERNALOID)
+                               {
+
+                                       if 
(!OidIsValid(transinfo->serialfn_oid) ||
+                                               
!OidIsValid(transinfo->deserialfn_oid))
+                                               root->hasNonSerialAggs = true;
+
+                                       /*
+                                        * array_agg_serialize and 
array_agg_deserialize make use
+                                        * of the aggregate non-byval input 
type's send and
+                                        * receive functions.  There's a chance 
that the type
+                                        * being aggregated has one or both of 
these functions
+                                        * missing.  In this case we must not 
allow the
+                                        * aggregate's serial and deserial 
functions to be used.
+                                        * It would be nice not to have special 
case this and
+                                        * instead provide some sort of 
supporting function within
+                                        * the aggregate to do this, but for 
now, that seems like
+                                        * overkill for this one case.
+                                        */
+                                       if ((transinfo->serialfn_oid == 
F_ARRAY_AGG_SERIALIZE ||
+                                                transinfo->deserialfn_oid == 
F_ARRAY_AGG_DESERIALIZE) &&
+                                               
!agg_args_support_sendreceive(aggref))
+                                               root->hasNonSerialAggs = true;
+                               }
                        }
                }
                agginfo->transno = transno;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 3ef9e8ee5e..8fda0c25ae 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -14,6 +14,7 @@
  */
 #include "postgres.h"
 
+#include "access/htup_details.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_type.h"
@@ -28,7 +29,7 @@
 #include "rewrite/rewriteManip.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
-
+#include "utils/syscache.h"
 
 typedef struct
 {
@@ -1943,6 +1944,40 @@ resolve_aggregate_transtype(Oid aggfuncid,
        return aggtranstype;
 }
 
+/*
+ * agg_args_support_sendreceive
+ *             Returns true if all non-byval of aggref's arg types have send 
and
+ *             receive functions.
+ */
+bool
+agg_args_support_sendreceive(Aggref *aggref)
+{
+       ListCell   *lc;
+
+       foreach(lc, aggref->args)
+       {
+               HeapTuple       typeTuple;
+               Form_pg_type pt;
+               TargetEntry *tle = (TargetEntry *) lfirst(lc);
+               Oid                     type = exprType((Node *) tle->expr);
+
+               typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
+               if (!HeapTupleIsValid(typeTuple))
+                       elog(ERROR, "cache lookup failed for type %u", type);
+
+               pt = (Form_pg_type) GETSTRUCT(typeTuple);
+
+               if (!pt->typbyval &&
+                       (!OidIsValid(pt->typsend) || 
!OidIsValid(pt->typreceive)))
+               {
+                       ReleaseSysCache(typeTuple);
+                       return false;
+               }
+               ReleaseSysCache(typeTuple);
+       }
+       return true;
+}
+
 /*
  * Create an expression tree for the transition function of an aggregate.
  * This is needed so that polymorphic functions can be used within an
diff --git a/src/backend/utils/adt/array_userfuncs.c 
b/src/backend/utils/adt/array_userfuncs.c
index ca70590d7d..10b8764e5f 100644
--- a/src/backend/utils/adt/array_userfuncs.c
+++ b/src/backend/utils/adt/array_userfuncs.c
@@ -13,12 +13,33 @@
 #include "postgres.h"
 
 #include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
 #include "common/int.h"
+#include "port/pg_bitutils.h"
 #include "utils/array.h"
+#include "utils/datum.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/typcache.h"
 
+/*
+ * SerialIOData
+ *             Used for caching element-type data in array_agg_serialize
+ */
+typedef struct SerialIOData
+{
+       FmgrInfo        typsend;
+} SerialIOData;
+
+/*
+ * DeserialIOData
+ *             Used for caching element-type data in array_agg_deserialize
+ */
+typedef struct DeserialIOData
+{
+       FmgrInfo        typreceive;
+       Oid                     typioparam;
+} DeserialIOData;
 
 static Datum array_position_common(FunctionCallInfo fcinfo);
 
@@ -499,6 +520,318 @@ array_agg_transfn(PG_FUNCTION_ARGS)
        PG_RETURN_POINTER(state);
 }
 
+Datum
+array_agg_combine(PG_FUNCTION_ARGS)
+{
+       ArrayBuildState *state1;
+       ArrayBuildState *state2;
+       MemoryContext agg_context;
+       MemoryContext old_context;
+       int                     i;
+
+       if (!AggCheckCallContext(fcinfo, &agg_context))
+               elog(ERROR, "aggregate function called in non-aggregate 
context");
+
+       state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) 
PG_GETARG_POINTER(0);
+       state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) 
PG_GETARG_POINTER(1);
+
+       if (state2 == NULL)
+       {
+               /*
+                * NULL state2 is easy, just return state1, which we know is 
already
+                * in the agg_context
+                */
+               if (state1 == NULL)
+                       PG_RETURN_NULL();
+               PG_RETURN_POINTER(state1);
+       }
+
+       if (state1 == NULL)
+       {
+               /* We must copy state2's data into the agg_context */
+               state1 = initArrayResultWithSize(state2->element_type, 
agg_context,
+                                                                               
 false, state2->alen);
+
+               old_context = MemoryContextSwitchTo(agg_context);
+
+               for (i = 0; i < state2->nelems; i++)
+               {
+                       if (!state2->dnulls[i])
+                               state1->dvalues[i] = 
datumCopy(state2->dvalues[i],
+                                                                               
           state1->typbyval,
+                                                                               
           state1->typlen);
+                       else
+                               state1->dvalues[i] = (Datum) 0;
+               }
+
+               MemoryContextSwitchTo(old_context);
+
+               memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * 
state2->nelems);
+
+               state1->nelems = state2->nelems;
+
+               PG_RETURN_POINTER(state1);
+       }
+       else if (state2->nelems > 0)
+       {
+               /* We only need to combine the two states if state2 has any 
elements */
+               int                     reqsize = state1->nelems + 
state2->nelems;
+               MemoryContext oldContext = 
MemoryContextSwitchTo(state1->mcontext);
+
+               Assert(state1->element_type == state2->element_type);
+
+               /* Enlarge state1 arrays if needed */
+               if (state1->alen < reqsize)
+               {
+                       /* Use a power of 2 size rather than allocating just 
reqsize */
+                       state1->alen = pg_nextpower2_32(reqsize);
+                       state1->dvalues = (Datum *) repalloc(state1->dvalues,
+                                                                               
                 state1->alen * sizeof(Datum));
+                       state1->dnulls = (bool *) repalloc(state1->dnulls,
+                                                                               
           state1->alen * sizeof(bool));
+               }
+
+               /* Copy in the state2 elements to the end of the state1 arrays 
*/
+               for (i = 0; i < state2->nelems; i++)
+               {
+                       if (!state2->dnulls[i])
+                               state1->dvalues[i + state1->nelems] =
+                                       datumCopy(state2->dvalues[i],
+                                                         state1->typbyval,
+                                                         state1->typlen);
+                       else
+                               state1->dvalues[i + state1->nelems] = (Datum) 0;
+               }
+
+               memcpy(&state1->dnulls[state1->nelems], state2->dnulls,
+                          sizeof(bool) * state2->nelems);
+
+               state1->nelems = reqsize;
+
+               MemoryContextSwitchTo(oldContext);
+       }
+
+       PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_serialize
+ *             Serialize ArrayBuildState into bytea.
+ */
+Datum
+array_agg_serialize(PG_FUNCTION_ARGS)
+{
+       ArrayBuildState *state;
+       StringInfoData buf;
+       bytea      *result;
+
+       /* cannot be called directly because of internal-type argument */
+       Assert(AggCheckCallContext(fcinfo, NULL));
+
+       state = (ArrayBuildState *) PG_GETARG_POINTER(0);
+
+       pq_begintypsend(&buf);
+
+       /*
+        * element_type. Putting this first is more convenient in 
deserialization
+        */
+       pq_sendint32(&buf, state->element_type);
+
+       /*
+        * nelems -- send first so we know how large to make the dvalues and
+        * dnulls array during deserialization.
+        */
+       pq_sendint64(&buf, state->nelems);
+
+       /* alen can be decided during deserialization */
+
+       /* typlen */
+       pq_sendint16(&buf, state->typlen);
+
+       /* typbyval */
+       pq_sendbyte(&buf, state->typbyval);
+
+       /* typalign */
+       pq_sendbyte(&buf, state->typalign);
+
+       /* dnulls */
+       pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * 
state->nelems);
+
+       /*
+        * dvalues.  By agreement with array_agg_deserialize, when the element
+        * type is byval, we just transmit the Datum array as-is, including any
+        * null elements.  For by-ref types, we must invoke the element type's
+        * send function, and we skip null elements (which is why the nulls 
flags
+        * must be sent first).
+        */
+       if (state->typbyval)
+               pq_sendbytes(&buf, (char *) state->dvalues,
+                                        sizeof(Datum) * state->nelems);
+       else
+       {
+               SerialIOData *iodata;
+               int                     i;
+
+               /* Avoid repeat catalog lookups for typsend function */
+               iodata = (SerialIOData *) fcinfo->flinfo->fn_extra;
+               if (iodata == NULL)
+               {
+                       Oid                     typsend;
+                       bool            typisvarlena;
+
+                       iodata = (SerialIOData *)
+                               MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+                                                                  
sizeof(SerialIOData));
+                       getTypeBinaryOutputInfo(state->element_type, &typsend,
+                                                                       
&typisvarlena);
+                       fmgr_info_cxt(typsend, &iodata->typsend,
+                                                 fcinfo->flinfo->fn_mcxt);
+                       fcinfo->flinfo->fn_extra = (void *) iodata;
+               }
+
+               for (i = 0; i < state->nelems; i++)
+               {
+                       bytea      *outputbytes;
+
+                       if (state->dnulls[i])
+                               continue;
+                       outputbytes = SendFunctionCall(&iodata->typsend,
+                                                                               
   state->dvalues[i]);
+                       pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
+                       pq_sendbytes(&buf, VARDATA(outputbytes),
+                                                VARSIZE(outputbytes) - 
VARHDRSZ);
+               }
+       }
+
+       result = pq_endtypsend(&buf);
+
+       PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_deserialize(PG_FUNCTION_ARGS)
+{
+       bytea      *sstate;
+       ArrayBuildState *result;
+       StringInfoData buf;
+       Oid                     element_type;
+       int64           nelems;
+       const char *temp;
+
+       if (!AggCheckCallContext(fcinfo, NULL))
+               elog(ERROR, "aggregate function called in non-aggregate 
context");
+
+       sstate = PG_GETARG_BYTEA_PP(0);
+
+       /*
+        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
+        * standard recv-function infrastructure.
+        */
+       initStringInfo(&buf);
+       appendBinaryStringInfo(&buf,
+                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+
+       /* element_type */
+       element_type = pq_getmsgint(&buf, 4);
+
+       /* nelems */
+       nelems = pq_getmsgint64(&buf);
+
+       /* Create output ArrayBuildState with the needed number of elements */
+       result = initArrayResultWithSize(element_type, CurrentMemoryContext,
+                                                                        false, 
nelems);
+       result->nelems = nelems;
+
+       /* typlen */
+       result->typlen = pq_getmsgint(&buf, 2);
+
+       /* typbyval */
+       result->typbyval = pq_getmsgbyte(&buf);
+
+       /* typalign */
+       result->typalign = pq_getmsgbyte(&buf);
+
+       /* dnulls */
+       temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems);
+       memcpy(result->dnulls, temp, sizeof(bool) * nelems);
+
+       /* dvalues --- see comment in array_agg_serialize */
+       if (result->typbyval)
+       {
+               temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems);
+               memcpy(result->dvalues, temp, sizeof(Datum) * nelems);
+       }
+       else
+       {
+               DeserialIOData *iodata;
+               int                     i;
+
+               /* Avoid repeat catalog lookups for typreceive function */
+               iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra;
+               if (iodata == NULL)
+               {
+                       Oid                     typreceive;
+
+                       iodata = (DeserialIOData *)
+                               MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+                                                                  
sizeof(DeserialIOData));
+                       getTypeBinaryInputInfo(element_type, &typreceive,
+                                                                  
&iodata->typioparam);
+                       fmgr_info_cxt(typreceive, &iodata->typreceive,
+                                                 fcinfo->flinfo->fn_mcxt);
+                       fcinfo->flinfo->fn_extra = (void *) iodata;
+               }
+
+               for (i = 0; i < nelems; i++)
+               {
+                       int                     itemlen;
+                       StringInfoData elem_buf;
+                       char            csave;
+
+                       if (result->dnulls[i])
+                       {
+                               result->dvalues[i] = (Datum) 0;
+                               continue;
+                       }
+
+                       itemlen = pq_getmsgint(&buf, 4);
+                       if (itemlen < 0 || itemlen > (buf.len - buf.cursor))
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+                                                errmsg("insufficient data left 
in message")));
+
+                       /*
+                        * Rather than copying data around, we just set up a 
phony
+                        * StringInfo pointing to the correct portion of the 
input buffer.
+                        * We assume we can scribble on the input buffer so as 
to maintain
+                        * the convention that StringInfos have a trailing null.
+                        */
+                       elem_buf.data = &buf.data[buf.cursor];
+                       elem_buf.maxlen = itemlen + 1;
+                       elem_buf.len = itemlen;
+                       elem_buf.cursor = 0;
+
+                       buf.cursor += itemlen;
+
+                       csave = buf.data[buf.cursor];
+                       buf.data[buf.cursor] = '\0';
+
+                       /* Now call the element's receiveproc */
+                       result->dvalues[i] = 
ReceiveFunctionCall(&iodata->typreceive,
+                                                                               
                         &elem_buf,
+                                                                               
                         iodata->typioparam,
+                                                                               
                         -1);
+
+                       buf.data[buf.cursor] = csave;
+               }
+       }
+
+       pq_getmsgend(&buf);
+       pfree(buf.data);
+
+       PG_RETURN_POINTER(result);
+}
+
 Datum
 array_agg_finalfn(PG_FUNCTION_ARGS)
 {
@@ -578,6 +911,299 @@ array_agg_array_transfn(PG_FUNCTION_ARGS)
        PG_RETURN_POINTER(state);
 }
 
+Datum
+array_agg_array_combine(PG_FUNCTION_ARGS)
+{
+       ArrayBuildStateArr *state1;
+       ArrayBuildStateArr *state2;
+       MemoryContext agg_context;
+       MemoryContext old_context;
+
+       if (!AggCheckCallContext(fcinfo, &agg_context))
+               elog(ERROR, "aggregate function called in non-aggregate 
context");
+
+       state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) 
PG_GETARG_POINTER(0);
+       state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) 
PG_GETARG_POINTER(1);
+
+       if (state2 == NULL)
+       {
+               /*
+                * NULL state2 is easy, just return state1, which we know is 
already
+                * in the agg_context
+                */
+               if (state1 == NULL)
+                       PG_RETURN_NULL();
+               PG_RETURN_POINTER(state1);
+       }
+
+       if (state1 == NULL)
+       {
+               /* We must copy state2's data into the agg_context */
+               old_context = MemoryContextSwitchTo(agg_context);
+
+               state1 = initArrayResultArr(state2->array_type, InvalidOid,
+                                                                       
agg_context, false);
+
+               state1->abytes = state2->abytes;
+               state1->data = (char *) palloc(state1->abytes);
+
+               if (state2->nullbitmap)
+               {
+                       int                     size = (state2->aitems + 7) / 8;
+
+                       state1->nullbitmap = (bits8 *) palloc(size);
+                       memcpy(state1->nullbitmap, state2->nullbitmap, size);
+               }
+
+               memcpy(state1->data, state2->data, state2->nbytes);
+               state1->nbytes = state2->nbytes;
+               state1->aitems = state2->aitems;
+               state1->nitems = state2->nitems;
+               state1->ndims = state2->ndims;
+               memcpy(state1->dims, state2->dims, sizeof(state2->dims));
+               memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs));
+               state1->array_type = state2->array_type;
+               state1->element_type = state2->element_type;
+
+               MemoryContextSwitchTo(old_context);
+
+               PG_RETURN_POINTER(state1);
+       }
+
+       /* We only need to combine the two states if state2 has any items */
+       else if (state2->nitems > 0)
+       {
+               MemoryContext oldContext;
+               int                     reqsize = state1->nbytes + 
state2->nbytes;
+               int                     i;
+
+               /*
+                * Check the states are compatible with each other.  Ensure we 
use the
+                * same error messages that are listed in accumArrayResultArr 
so that
+                * the same error is shown as would have been if we'd not used 
the
+                * combine function for the aggregation.
+                */
+               if (state1->ndims != state2->ndims)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+                                        errmsg("cannot accumulate arrays of 
different dimensionality")));
+
+               /* Check dimensions match ignoring the first dimension. */
+               for (i = 1; i < state1->ndims; i++)
+               {
+                       if (state1->dims[i] != state2->dims[i] || 
state1->lbs[i] != state2->lbs[i])
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+                                                errmsg("cannot accumulate 
arrays of different dimensionality")));
+               }
+
+
+               oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+               /*
+                * If there's not enough space in state1 then we'll need to 
reallocate
+                * more.
+                */
+               if (state1->abytes < reqsize)
+               {
+                       /* use a power of 2 size rather than allocating just 
reqsize */
+                       state1->abytes = pg_nextpower2_32(reqsize);
+                       state1->data = (char *) repalloc(state1->data, 
state1->abytes);
+               }
+
+               if (state2->nullbitmap)
+               {
+                       int                     newnitems = state1->nitems + 
state2->nitems;
+
+                       if (state1->nullbitmap == NULL)
+                       {
+                               /*
+                                * First input with nulls; we must 
retrospectively handle any
+                                * previous inputs by marking all their items 
non-null.
+                                */
+                               state1->aitems = pg_nextpower2_32(Max(256, 
newnitems + 1));
+                               state1->nullbitmap = (bits8 *) 
palloc((state1->aitems + 7) / 8);
+                               array_bitmap_copy(state1->nullbitmap, 0,
+                                                                 NULL, 0,
+                                                                 
state1->nitems);
+                       }
+                       else if (newnitems > state1->aitems)
+                       {
+                               int                     newaitems = 
state1->aitems + state2->aitems;
+
+                               state1->aitems = pg_nextpower2_32(newaitems);
+                               state1->nullbitmap = (bits8 *)
+                                       repalloc(state1->nullbitmap, 
(state1->aitems + 7) / 8);
+                       }
+                       array_bitmap_copy(state1->nullbitmap, state1->nitems,
+                                                         state2->nullbitmap, 0,
+                                                         state2->nitems);
+               }
+
+               memcpy(state1->data + state1->nbytes, state2->data, 
state2->nbytes);
+               state1->nbytes += state2->nbytes;
+               state1->nitems += state2->nitems;
+
+               state1->dims[0] += state2->dims[0];
+               /* remaing dims already match, per test above */
+
+               Assert(state1->array_type == state2->array_type);
+               Assert(state1->element_type == state2->element_type);
+
+               MemoryContextSwitchTo(oldContext);
+       }
+
+       PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_array_serialize
+ *             Serialize ArrayBuildStateArr into bytea.
+ */
+Datum
+array_agg_array_serialize(PG_FUNCTION_ARGS)
+{
+       ArrayBuildStateArr *state;
+       StringInfoData buf;
+       bytea      *result;
+
+       /* cannot be called directly because of internal-type argument */
+       Assert(AggCheckCallContext(fcinfo, NULL));
+
+       state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+
+       pq_begintypsend(&buf);
+
+       /*
+        * element_type. Putting this first is more convenient in 
deserialization
+        * so that we can init the new state sooner.
+        */
+       pq_sendint32(&buf, state->element_type);
+
+       /* array_type */
+       pq_sendint32(&buf, state->array_type);
+
+       /* nbytes */
+       pq_sendint32(&buf, state->nbytes);
+
+       /* data */
+       pq_sendbytes(&buf, state->data, state->nbytes);
+
+       /* abytes */
+       pq_sendint32(&buf, state->abytes);
+
+       /* aitems */
+       pq_sendint32(&buf, state->aitems);
+
+       /* nullbitmap */
+       if (state->nullbitmap)
+       {
+               Assert(state->aitems > 0);
+               pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 
7) / 8);
+       }
+
+       /* nitems */
+       pq_sendint32(&buf, state->nitems);
+
+       /* ndims */
+       pq_sendint32(&buf, state->ndims);
+
+       /* dims: XXX should we just send ndims elements? */
+       pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims));
+
+       /* lbs */
+       pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs));
+
+       result = pq_endtypsend(&buf);
+
+       PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_array_deserialize(PG_FUNCTION_ARGS)
+{
+       bytea      *sstate;
+       ArrayBuildStateArr *result;
+       StringInfoData buf;
+       Oid                     element_type;
+       Oid                     array_type;
+       int                     nbytes;
+       const char *temp;
+
+       /* cannot be called directly because of internal-type argument */
+       Assert(AggCheckCallContext(fcinfo, NULL));
+
+       sstate = PG_GETARG_BYTEA_PP(0);
+
+       /*
+        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
+        * standard recv-function infrastructure.
+        */
+       initStringInfo(&buf);
+       appendBinaryStringInfo(&buf,
+                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+
+       /* element_type */
+       element_type = pq_getmsgint(&buf, 4);
+
+       /* array_type */
+       array_type = pq_getmsgint(&buf, 4);
+
+       /* nbytes */
+       nbytes = pq_getmsgint(&buf, 4);
+
+       result = initArrayResultArr(array_type, element_type,
+                                                               
CurrentMemoryContext, false);
+
+       result->abytes = 1024;
+       while (result->abytes < nbytes)
+               result->abytes *= 2;
+
+       result->data = (char *) palloc(result->abytes);
+
+       /* data */
+       temp = pq_getmsgbytes(&buf, nbytes);
+       memcpy(result->data, temp, nbytes);
+       result->nbytes = nbytes;
+
+       /* abytes */
+       result->abytes = pq_getmsgint(&buf, 4);
+
+       /* aitems: might be 0 */
+       result->aitems = pq_getmsgint(&buf, 4);
+
+       /* nullbitmap */
+       if (result->aitems > 0)
+       {
+               int                     size = (result->aitems + 7) / 8;
+
+               result->nullbitmap = (bits8 *) palloc(size);
+               temp = pq_getmsgbytes(&buf, size);
+               memcpy(result->nullbitmap, temp, size);
+       }
+       else
+               result->nullbitmap = NULL;
+
+       /* nitems */
+       result->nitems = pq_getmsgint(&buf, 4);
+
+       /* ndims */
+       result->ndims = pq_getmsgint(&buf, 4);
+
+       /* dims */
+       temp = pq_getmsgbytes(&buf, sizeof(result->dims));
+       memcpy(result->dims, temp, sizeof(result->dims));
+
+       /* lbs */
+       temp = pq_getmsgbytes(&buf, sizeof(result->lbs));
+       memcpy(result->lbs, temp, sizeof(result->lbs));
+
+       pq_getmsgend(&buf);
+       pfree(buf.data);
+
+       PG_RETURN_POINTER(result);
+}
+
 Datum
 array_agg_array_finalfn(PG_FUNCTION_ARGS)
 {
diff --git a/src/backend/utils/adt/arrayfuncs.c 
b/src/backend/utils/adt/arrayfuncs.c
index 495e449a9e..dc3f634b48 100644
--- a/src/backend/utils/adt/arrayfuncs.c
+++ b/src/backend/utils/adt/arrayfuncs.c
@@ -5236,6 +5236,24 @@ array_insert_slice(ArrayType *destArray,
  */
 ArrayBuildState *
 initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
+{
+       /*
+        * When using a subcontext, we can afford to start with a somewhat 
larger
+        * initial array size.  Without subcontexts, we'd better hope that most 
of
+        * the states stay small ...
+        */
+       return initArrayResultWithSize(element_type, rcontext, subcontext,
+                                                                  subcontext ? 
64 : 8);
+}
+
+/*
+ * initArrayResultWithSize
+ *             As initArrayResult, but allow the initial size of the allocated 
arrays
+ *             to be specified.
+ */
+ArrayBuildState *
+initArrayResultWithSize(Oid element_type, MemoryContext rcontext,
+                                               bool subcontext, int initsize)
 {
        ArrayBuildState *astate;
        MemoryContext arr_context = rcontext;
@@ -5250,7 +5268,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, 
bool subcontext)
                MemoryContextAlloc(arr_context, sizeof(ArrayBuildState));
        astate->mcontext = arr_context;
        astate->private_cxt = subcontext;
-       astate->alen = (subcontext ? 64 : 8);   /* arbitrary starting array 
size */
+       astate->alen = initsize;
        astate->dvalues = (Datum *)
                MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum));
        astate->dnulls = (bool *)
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 919138eaf3..96ccd90bd6 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -503,29 +503,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS)
 
        state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
 
-       /* Append the value unless null. */
+       /* Append the value unless null, preceding it with the delimiter. */
        if (!PG_ARGISNULL(1))
        {
                bytea      *value = PG_GETARG_BYTEA_PP(1);
+               bool            isfirst = false;
 
-               /* On the first time through, we ignore the delimiter. */
+               /*
+                * You might think we can just throw away the first delimiter, 
however
+                * we must keep it as we may be a parallel worker doing partial
+                * aggregation building a state to send to the main process.  
We need
+                * to keep the delimiter of every aggregation so that the 
combine
+                * function can properly join up the strings of two separately
+                * partially aggregated results.  The first delimiter is only 
stripped
+                * off in the final function.  To know how much to strip off 
the front
+                * of the string, we store the length of the first delimiter in 
the
+                * StringInfo's cursor field, which we don't otherwise need 
here.
+                */
                if (state == NULL)
+               {
                        state = makeStringAggState(fcinfo);
-               else if (!PG_ARGISNULL(2))
+                       isfirst = true;
+               }
+
+               if (!PG_ARGISNULL(2))
                {
                        bytea      *delim = PG_GETARG_BYTEA_PP(2);
 
-                       appendBinaryStringInfo(state, VARDATA_ANY(delim), 
VARSIZE_ANY_EXHDR(delim));
+                       appendBinaryStringInfo(state, VARDATA_ANY(delim),
+                                                                  
VARSIZE_ANY_EXHDR(delim));
+                       if (isfirst)
+                               state->cursor = VARSIZE_ANY_EXHDR(delim);
                }
 
-               appendBinaryStringInfo(state, VARDATA_ANY(value), 
VARSIZE_ANY_EXHDR(value));
+               appendBinaryStringInfo(state, VARDATA_ANY(value),
+                                                          
VARSIZE_ANY_EXHDR(value));
        }
 
        /*
         * The transition type for string_agg() is declared to be "internal",
         * which is a pass-by-value type the same size as a pointer.
         */
-       PG_RETURN_POINTER(state);
+       if (state)
+               PG_RETURN_POINTER(state);
+       PG_RETURN_NULL();
 }
 
 Datum
@@ -540,11 +561,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS)
 
        if (state != NULL)
        {
+               /* As per comment in transfn, strip data before the cursor 
position */
                bytea      *result;
+               int                     strippedlen = state->len - 
state->cursor;
 
-               result = (bytea *) palloc(state->len + VARHDRSZ);
-               SET_VARSIZE(result, state->len + VARHDRSZ);
-               memcpy(VARDATA(result), state->data, state->len);
+               result = (bytea *) palloc(strippedlen + VARHDRSZ);
+               SET_VARSIZE(result, strippedlen + VARHDRSZ);
+               memcpy(VARDATA(result), &state->data[state->cursor], 
strippedlen);
                PG_RETURN_BYTEA_P(result);
        }
        else
@@ -5373,23 +5396,171 @@ string_agg_transfn(PG_FUNCTION_ARGS)
 
        state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
 
-       /* Append the value unless null. */
+       /* Append the value unless null, preceding it with the delimiter. */
        if (!PG_ARGISNULL(1))
        {
-               /* On the first time through, we ignore the delimiter. */
+               text       *value = PG_GETARG_TEXT_PP(1);
+               bool            isfirst = false;
+
+               /*
+                * You might think we can just throw away the first delimiter, 
however
+                * we must keep it as we may be a parallel worker doing partial
+                * aggregation building a state to send to the main process.  
We need
+                * to keep the delimiter of every aggregation so that the 
combine
+                * function can properly join up the strings of two separately
+                * partially aggregated results.  The first delimiter is only 
stripped
+                * off in the final function.  To know how much to strip off 
the front
+                * of the string, we store the length of the first delimiter in 
the
+                * StringInfo's cursor field, which we don't otherwise need 
here.
+                */
                if (state == NULL)
+               {
                        state = makeStringAggState(fcinfo);
-               else if (!PG_ARGISNULL(2))
-                       appendStringInfoText(state, PG_GETARG_TEXT_PP(2));      
/* delimiter */
+                       isfirst = true;
+               }
 
-               appendStringInfoText(state, PG_GETARG_TEXT_PP(1));      /* 
value */
+               if (!PG_ARGISNULL(2))
+               {
+                       text       *delim = PG_GETARG_TEXT_PP(2);
+
+                       appendStringInfoText(state, delim);
+                       if (isfirst)
+                               state->cursor = VARSIZE_ANY_EXHDR(delim);
+               }
+
+               appendStringInfoText(state, value);
        }
 
        /*
         * The transition type for string_agg() is declared to be "internal",
         * which is a pass-by-value type the same size as a pointer.
         */
-       PG_RETURN_POINTER(state);
+       if (state)
+               PG_RETURN_POINTER(state);
+       PG_RETURN_NULL();
+}
+
+/*
+ * string_agg_combine
+ *             Aggregate combine function for string_agg(text) and 
string_agg(bytea)
+ */
+Datum
+string_agg_combine(PG_FUNCTION_ARGS)
+{
+       StringInfo      state1;
+       StringInfo      state2;
+       MemoryContext agg_context;
+
+       if (!AggCheckCallContext(fcinfo, &agg_context))
+               elog(ERROR, "aggregate function called in non-aggregate 
context");
+
+       state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
+       state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1);
+
+       if (state2 == NULL)
+       {
+               /*
+                * NULL state2 is easy, just return state1, which we know is 
already
+                * in the agg_context
+                */
+               if (state1 == NULL)
+                       PG_RETURN_NULL();
+               PG_RETURN_POINTER(state1);
+       }
+
+       if (state1 == NULL)
+       {
+               /* We must copy state2's data into the agg_context */
+               MemoryContext old_context;
+
+               old_context = MemoryContextSwitchTo(agg_context);
+               state1 = makeStringAggState(fcinfo);
+               appendBinaryStringInfo(state1, state2->data, state2->len);
+               state1->cursor = state2->cursor;
+               MemoryContextSwitchTo(old_context);
+       }
+       else if (state2->len > 0)
+       {
+               /* Combine ... state1->cursor does not change in this case */
+               appendBinaryStringInfo(state1, state2->data, state2->len);
+       }
+
+       PG_RETURN_POINTER(state1);
+}
+
+/*
+ * string_agg_serialize
+ *             Aggregate serialize function for string_agg(text) and 
string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_serialize(PG_FUNCTION_ARGS)
+{
+       StringInfo      state;
+       StringInfoData buf;
+       bytea      *result;
+
+       /* cannot be called directly because of internal-type argument */
+       Assert(AggCheckCallContext(fcinfo, NULL));
+
+       state = (StringInfo) PG_GETARG_POINTER(0);
+
+       pq_begintypsend(&buf);
+
+       /* cursor */
+       pq_sendint(&buf, state->cursor, 4);
+
+       /* data */
+       pq_sendbytes(&buf, state->data, state->len);
+
+       result = pq_endtypsend(&buf);
+
+       PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * string_agg_deserialize
+ *             Aggregate deserial function for string_agg(text) and 
string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_deserialize(PG_FUNCTION_ARGS)
+{
+       bytea      *sstate;
+       StringInfo      result;
+       StringInfoData buf;
+       char       *data;
+       int                     datalen;
+
+       /* cannot be called directly because of internal-type argument */
+       Assert(AggCheckCallContext(fcinfo, NULL));
+
+       sstate = PG_GETARG_BYTEA_PP(0);
+
+       /*
+        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
+        * standard recv-function infrastructure.
+        */
+       initStringInfo(&buf);
+       appendBinaryStringInfo(&buf,
+                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+
+       result = makeStringAggState(fcinfo);
+
+       /* cursor */
+       result->cursor = pq_getmsgint(&buf, 4);
+
+       /* data */
+       datalen = VARSIZE_ANY_EXHDR(sstate) - 4;
+       data = (char *) pq_getmsgbytes(&buf, datalen);
+       appendBinaryStringInfo(result, data, datalen);
+
+       pq_getmsgend(&buf);
+       pfree(buf.data);
+
+       PG_RETURN_POINTER(result);
 }
 
 Datum
@@ -5403,7 +5574,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS)
        state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
 
        if (state != NULL)
-               PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, 
state->len));
+       {
+               /* As per comment in transfn, strip data before the cursor 
position */
+               
PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor],
+                                                                               
                  state->len - state->cursor));
+       }
        else
                PG_RETURN_NULL();
 }
diff --git a/src/include/catalog/pg_aggregate.dat 
b/src/include/catalog/pg_aggregate.dat
index 86cc650798..23c933749b 100644
--- a/src/include/catalog/pg_aggregate.dat
+++ b/src/include/catalog/pg_aggregate.dat
@@ -537,19 +537,28 @@
 
 # array
 { aggfnoid => 'array_agg(anynonarray)', aggtransfn => 'array_agg_transfn',
-  aggfinalfn => 'array_agg_finalfn', aggfinalextra => 't',
-  aggtranstype => 'internal' },
+  aggcombinefn => 'array_agg_combine', aggserialfn => 'array_agg_serialize',
+  aggdeserialfn => 'array_agg_deserialize', aggfinalfn => 'array_agg_finalfn',
+  aggfinalextra => 't', aggtranstype => 'internal' },
 { aggfnoid => 'array_agg(anyarray)', aggtransfn => 'array_agg_array_transfn',
+  aggcombinefn => 'array_agg_array_combine',
+  aggserialfn => 'array_agg_array_serialize',
+  aggdeserialfn => 'array_agg_array_deserialize',
   aggfinalfn => 'array_agg_array_finalfn', aggfinalextra => 't',
   aggtranstype => 'internal' },
 
 # text
 { aggfnoid => 'string_agg(text,text)', aggtransfn => 'string_agg_transfn',
+  aggcombinefn => 'string_agg_combine', aggserialfn => 'string_agg_serialize',
+  aggdeserialfn => 'string_agg_deserialize',
   aggfinalfn => 'string_agg_finalfn', aggtranstype => 'internal' },
 
 # bytea
 { aggfnoid => 'string_agg(bytea,bytea)',
   aggtransfn => 'bytea_string_agg_transfn',
+  aggcombinefn => 'string_agg_combine',
+  aggserialfn => 'string_agg_serialize',
+  aggdeserialfn => 'string_agg_deserialize',
   aggfinalfn => 'bytea_string_agg_finalfn', aggtranstype => 'internal' },
 
 # range
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index be47583122..81ff5153d8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -1643,6 +1643,15 @@
 { oid => '2333', descr => 'aggregate transition function',
   proname => 'array_agg_transfn', proisstrict => 'f', prorettype => 'internal',
   proargtypes => 'internal anynonarray', prosrc => 'array_agg_transfn' },
+{ oid => '9328', descr => 'aggregate combine function',
+  proname => 'array_agg_combine', proisstrict => 'f', prorettype => 'internal',
+  proargtypes => 'internal internal', prosrc => 'array_agg_combine' },
+{ oid => '9329', descr => 'aggregate serial function',
+  proname => 'array_agg_serialize', prorettype => 'bytea',
+  proargtypes => 'internal', prosrc => 'array_agg_serialize' },
+{ oid => '9330', descr => 'aggregate deserial function',
+  proname => 'array_agg_deserialize', prorettype => 'internal',
+  proargtypes => 'bytea internal', prosrc => 'array_agg_deserialize' },
 { oid => '2334', descr => 'aggregate final function',
   proname => 'array_agg_finalfn', proisstrict => 'f', prorettype => 'anyarray',
   proargtypes => 'internal anynonarray', prosrc => 'array_agg_finalfn' },
@@ -1654,6 +1663,15 @@
   proname => 'array_agg_array_transfn', proisstrict => 'f',
   prorettype => 'internal', proargtypes => 'internal anyarray',
   prosrc => 'array_agg_array_transfn' },
+{ oid => '9331', descr => 'aggregate combine function',
+  proname => 'array_agg_array_combine', proisstrict => 'f', prorettype => 
'internal',
+  proargtypes => 'internal internal', prosrc => 'array_agg_array_combine' },
+{ oid => '9332', descr => 'aggregate serial function',
+  proname => 'array_agg_array_serialize', prorettype => 'bytea',
+  proargtypes => 'internal', prosrc => 'array_agg_array_serialize' },
+{ oid => '9333', descr => 'aggregate deserial function',
+  proname => 'array_agg_array_deserialize', prorettype => 'internal',
+  proargtypes => 'bytea internal', prosrc => 'array_agg_array_deserialize' },
 { oid => '4052', descr => 'aggregate final function',
   proname => 'array_agg_array_finalfn', proisstrict => 'f',
   prorettype => 'anyarray', proargtypes => 'internal anyarray',
@@ -4922,6 +4940,15 @@
 { oid => '3535', descr => 'aggregate transition function',
   proname => 'string_agg_transfn', proisstrict => 'f', prorettype => 
'internal',
   proargtypes => 'internal text text', prosrc => 'string_agg_transfn' },
+{ oid => '9334', descr => 'aggregate combine function',
+  proname => 'string_agg_combine', proisstrict => 'f', prorettype => 
'internal',
+  proargtypes => 'internal internal', prosrc => 'string_agg_combine' },
+{ oid => '9335', descr => 'aggregate serial function',
+  proname => 'string_agg_serialize', prorettype => 'bytea',
+  proargtypes => 'internal', prosrc => 'string_agg_serialize' },
+{ oid => '9336', descr => 'aggregate deserial function',
+  proname => 'string_agg_deserialize', prorettype => 'internal',
+  proargtypes => 'bytea internal', prosrc => 'string_agg_deserialize' },
 { oid => '3536', descr => 'aggregate final function',
   proname => 'string_agg_finalfn', proisstrict => 'f', prorettype => 'text',
   proargtypes => 'internal', prosrc => 'string_agg_finalfn' },
diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h
index c56822f645..f7af0c21a7 100644
--- a/src/include/parser/parse_agg.h
+++ b/src/include/parser/parse_agg.h
@@ -35,6 +35,8 @@ extern Oid    resolve_aggregate_transtype(Oid aggfuncid,
                                                                                
Oid *inputTypes,
                                                                                
int numArguments);
 
+extern bool agg_args_support_sendreceive(Aggref *aggref);
+
 extern void build_aggregate_transfn_expr(Oid *agg_input_types,
                                                                                
 int agg_num_inputs,
                                                                                
 int agg_num_direct_inputs,
diff --git a/src/include/utils/array.h b/src/include/utils/array.h
index 2f794d1168..ed82242bd8 100644
--- a/src/include/utils/array.h
+++ b/src/include/utils/array.h
@@ -409,6 +409,9 @@ extern bool array_contains_nulls(ArrayType *array);
 
 extern ArrayBuildState *initArrayResult(Oid element_type,
                                                                                
MemoryContext rcontext, bool subcontext);
+extern ArrayBuildState *initArrayResultWithSize(Oid element_type,
+                                                                               
                MemoryContext rcontext,
+                                                                               
                bool subcontext, int initsize);
 extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate,
                                                                                
 Datum dvalue, bool disnull,
                                                                                
 Oid element_type,
diff --git a/src/test/regress/expected/aggregates.out 
b/src/test/regress/expected/aggregates.out
index b2198724e3..d92ab0f7dc 100644
--- a/src/test/regress/expected/aggregates.out
+++ b/src/test/regress/expected/aggregates.out
@@ -1831,6 +1831,104 @@ select string_agg(v, decode('ee', 'hex')) from 
bytea_test_table;
 (1 row)
 
 drop table bytea_test_table;
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+       y,
+       min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+       min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+       min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+       min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+       select
+               y,
+               unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+               unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+               unnest(a1.a) AS a,
+               unnest(a1.aa) AS aa
+       from (
+               select
+                       y,
+                       string_agg(x::text, ',') AS t,
+                       string_agg(x::text::bytea, ',') AS b,
+                       array_agg(x) AS a,
+                       array_agg(ARRAY[x]) AS aa
+               from pagg_test
+               group by y
+       ) a1
+) a2
+group by y;
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | 
andistinct | aamin | aamax | aandistinct 
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 |   10 | 5000 |        500 | 10   | 990  |        500 |   10 | 5000 |       
 500 |    10 |  5000 |         500
+ 1 |   11 | 4991 |        250 | 1011 | 991  |        250 |   11 | 4991 |       
 250 |    11 |  4991 |         250
+ 2 |    2 | 4992 |        500 | 1002 | 992  |        500 |    2 | 4992 |       
 500 |     2 |  4992 |         500
+ 3 |    3 | 4983 |        250 | 1003 | 983  |        250 |    3 | 4983 |       
 250 |     3 |  4983 |         250
+ 4 |    4 | 4994 |        500 | 1004 | 994  |        500 |    4 | 4994 |       
 500 |     4 |  4994 |         500
+ 5 |   15 | 4995 |        250 | 1015 | 995  |        250 |   15 | 4995 |       
 250 |    15 |  4995 |         250
+ 6 |    6 | 4996 |        500 | 1006 | 996  |        500 |    6 | 4996 |       
 500 |     6 |  4996 |         500
+ 7 |    7 | 4987 |        250 | 1007 | 987  |        250 |    7 | 4987 |       
 250 |     7 |  4987 |         250
+ 8 |    8 | 4998 |        500 | 1008 | 998  |        500 |    8 | 4998 |       
 500 |     8 |  4998 |         500
+ 9 |   19 | 4999 |        250 | 1019 | 999  |        250 |   19 | 4999 |       
 250 |    19 |  4999 |         250
+(10 rows)
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+                                                              QUERY PLAN       
                                                       
+--------------------------------------------------------------------------------------------------------------------------------------
+ GroupAggregate
+   Group Key: pagg_test.y
+   ->  Sort
+         Sort Key: pagg_test.y, 
(((unnest(regexp_split_to_array((string_agg((pagg_test.x)::text, ','::text)), 
','::text))))::integer)
+         ->  Result
+               ->  ProjectSet
+                     ->  Finalize HashAggregate
+                           Group Key: pagg_test.y
+                           ->  Gather
+                                 Workers Planned: 2
+                                 ->  Partial HashAggregate
+                                       Group Key: pagg_test.y
+                                       ->  Parallel Seq Scan on pagg_test
+(13 rows)
+
+set max_parallel_workers_per_gather = 0;
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | 
andistinct | aamin | aamax | aandistinct 
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 |   10 | 5000 |        500 | 10   | 990  |        500 |   10 | 5000 |       
 500 |    10 |  5000 |         500
+ 1 |   11 | 4991 |        250 | 1011 | 991  |        250 |   11 | 4991 |       
 250 |    11 |  4991 |         250
+ 2 |    2 | 4992 |        500 | 1002 | 992  |        500 |    2 | 4992 |       
 500 |     2 |  4992 |         500
+ 3 |    3 | 4983 |        250 | 1003 | 983  |        250 |    3 | 4983 |       
 250 |     3 |  4983 |         250
+ 4 |    4 | 4994 |        500 | 1004 | 994  |        500 |    4 | 4994 |       
 500 |     4 |  4994 |         500
+ 5 |   15 | 4995 |        250 | 1015 | 995  |        250 |   15 | 4995 |       
 250 |    15 |  4995 |         250
+ 6 |    6 | 4996 |        500 | 1006 | 996  |        500 |    6 | 4996 |       
 500 |     6 |  4996 |         500
+ 7 |    7 | 4987 |        250 | 1007 | 987  |        250 |    7 | 4987 |       
 250 |     7 |  4987 |         250
+ 8 |    8 | 4998 |        500 | 1008 | 998  |        500 |    8 | 4998 |       
 500 |     8 |  4998 |         500
+ 9 |   19 | 4999 |        250 | 1019 | 999  |        250 |   19 | 4999 |       
 250 |    19 |  4999 |         250
+(10 rows)
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+drop view v_pagg_test;
+drop table pagg_test;
 -- FILTER tests
 select min(unique1) filter (where unique1 > 100) from tenk1;
  min 
diff --git a/src/test/regress/sql/aggregates.sql 
b/src/test/regress/sql/aggregates.sql
index 4540a06f45..bdbea3cbd9 100644
--- a/src/test/regress/sql/aggregates.sql
+++ b/src/test/regress/sql/aggregates.sql
@@ -700,6 +700,68 @@ select string_agg(v, decode('ee', 'hex')) from 
bytea_test_table;
 
 drop table bytea_test_table;
 
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+       y,
+       min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+       min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+       min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+       min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+       select
+               y,
+               unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+               unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+               unnest(a1.a) AS a,
+               unnest(a1.aa) AS aa
+       from (
+               select
+                       y,
+                       string_agg(x::text, ',') AS t,
+                       string_agg(x::text::bytea, ',') AS b,
+                       array_agg(x) AS a,
+                       array_agg(ARRAY[x]) AS aa
+               from pagg_test
+               group by y
+       ) a1
+) a2
+group by y;
+
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+
+set max_parallel_workers_per_gather = 0;
+
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+
+drop view v_pagg_test;
+drop table pagg_test;
+
 -- FILTER tests
 
 select min(unique1) filter (where unique1 > 100) from tenk1;

Reply via email to