> On Wed, 20 Sept 2023 at 15:00, Ashutosh Bapat
> <ashutosh.bapat....@gmail.com> wrote:
> >
> > 0005 - Refactored Jian's code fixing window functions. Does not
> > contain the changes for serialization and deserialization. Jian,
> > please let me know if I have missed anything else.
> >

attached serialization and deserialization function.


>
> Also, in do_interval_discard(), this seems a bit risky:
>
> +        neg_val.day = -newval->day;
> +        neg_val.month = -newval->month;
> +        neg_val.time = -newval->time;
>

we already have interval negate function, So I changed to interval_um_internal.
based on 20230920 patches. I have made the attached changes.

The serialization do make big difference when configure to parallel mode.
From bff5e3dfa8607a8b45aa287a7c55fda9d984f339 Mon Sep 17 00:00:00 2001
From: pgaddict <jian.universal...@gmail.com>
Date: Thu, 21 Sep 2023 10:05:21 +0800
Subject: [PATCH v22 7/7] interval aggregate serializatinn and deserialization

add interval aggregate serialization and deserialization function.
fix a typo.
change manually negate a interval to use interval_um_internal.
---
 src/backend/utils/adt/timestamp.c    | 86 +++++++++++++++++++++++++---
 src/include/catalog/pg_aggregate.dat |  4 ++
 src/include/catalog/pg_proc.dat      |  6 ++
 3 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index c6dc2d44..2b86171a 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -75,7 +75,7 @@ typedef struct
 
 /*
  * The transition datatype for interval aggregates is declared as Internal. It's
- * a pointer to a NumericAggState allocated in the aggregate context.
+ * a pointer to a IntervalAggState allocated in the aggregate context.
  */
 typedef struct IntervalAggState
 {
@@ -3984,10 +3984,7 @@ interval_avg_combine(PG_FUNCTION_ARGS)
 		state1->N = state2->N;
 		state1->pInfcount = state2->pInfcount;
 		state1->nInfcount = state2->nInfcount;
-
-		state1->sumX.day = state2->sumX.day;
-		state1->sumX.month = state2->sumX.month;
-		state1->sumX.time = state2->sumX.time;
+		memcpy(&state1->sumX, &state2->sumX, sizeof(Interval));
 
 		PG_RETURN_POINTER(state1);
 	}
@@ -4008,6 +4005,81 @@ interval_avg_combine(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(state1);
 }
 
+/*
+ * interval_avg_serialize
+ *		Serialize IntervalAggState for interval aggregates.
+ */
+Datum
+interval_avg_serialize(PG_FUNCTION_ARGS)
+{
+	IntervalAggState *state;
+	StringInfoData buf;
+	bytea	   *result;
+	/* Ensure we disallow calling when not in aggregate context */
+	if (!AggCheckCallContext(fcinfo, NULL))
+		elog(ERROR, "aggregate function called in non-aggregate context");
+	state = (IntervalAggState *) PG_GETARG_POINTER(0);
+	pq_begintypsend(&buf);
+	/* N */
+	pq_sendint64(&buf, state->N);
+	/* Interval struct elements, one by one. */
+	pq_sendint64(&buf, state->sumX.time);
+	pq_sendint32(&buf, state->sumX.day);
+	pq_sendint32(&buf, state->sumX.month);
+	/* pInfcount */
+	pq_sendint64(&buf, state->pInfcount);
+	/* nInfcount */
+	pq_sendint64(&buf, state->nInfcount);
+	result = pq_endtypsend(&buf);
+	PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * interval_avg_deserialize
+ *		Deserialize bytea into IntervalAggState for interval aggregates.
+ */
+Datum
+interval_avg_deserialize(PG_FUNCTION_ARGS)
+{
+	bytea	   *sstate;
+	IntervalAggState *result;
+	StringInfoData buf;
+
+	if (!AggCheckCallContext(fcinfo, NULL))
+		elog(ERROR, "aggregate function called in non-aggregate context");
+
+	sstate = PG_GETARG_BYTEA_PP(0);
+
+	/*
+	 * Copy the bytea into a StringInfo so that we can "receive" it using the
+	 * standard recv-function infrastructure.
+	 */
+	initStringInfo(&buf);
+	appendBinaryStringInfo(&buf,
+						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+	result = makeIntervalAggState(fcinfo);
+
+	/* N */
+	result->N = pq_getmsgint64(&buf);
+
+	/* Interval struct elements, one by one. */
+	result->sumX.time = pq_getmsgint64(&buf);
+	result->sumX.day = pq_getmsgint(&buf, sizeof(result->sumX.day));
+	result->sumX.month = pq_getmsgint(&buf, sizeof(result->sumX.month));
+
+	/* pInfcount */
+	result->pInfcount = pq_getmsgint64(&buf);
+
+	/* nInfcount */
+	result->nInfcount = pq_getmsgint64(&buf);
+
+	pq_getmsgend(&buf);
+	pfree(buf.data);
+
+	PG_RETURN_POINTER(result);
+}
+
 /*
  * Remove the given interval value from the aggregated state.
  */
@@ -4034,9 +4106,7 @@ do_interval_discard(IntervalAggState *state, Interval *newval)
 		Interval	temp;
 		Interval	neg_val;
 
-		neg_val.day = -newval->day;
-		neg_val.month = -newval->month;
-		neg_val.time = -newval->time;
+		interval_um_internal(newval, &neg_val);
 
 		memcpy(&temp, &state->sumX, sizeof(Interval));
 		finite_interval_pl(&state->sumX, &temp, &neg_val);
diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat
index e2087d7b..0e62c3f7 100644
--- a/src/include/catalog/pg_aggregate.dat
+++ b/src/include/catalog/pg_aggregate.dat
@@ -45,6 +45,8 @@
   aggtranstype => '_float8', agginitval => '{0,0,0}' },
 { aggfnoid => 'avg(interval)', aggtransfn => 'interval_avg_accum',
   aggfinalfn => 'interval_avg', aggcombinefn => 'interval_avg_combine',
+  aggserialfn => 'interval_avg_serialize',
+  aggdeserialfn => 'interval_avg_deserialize',
   aggmtransfn => 'interval_avg_accum', aggminvtransfn => 'interval_accum_inv',
   aggmfinalfn => 'interval_avg', aggtranstype => 'internal',
   aggmtranstype => 'internal', aggtransspace => '128',
@@ -75,6 +77,8 @@
   aggtranstype => 'money', aggmtranstype => 'money' },
 { aggfnoid => 'sum(interval)', aggtransfn => 'interval_avg_accum',
   aggfinalfn => 'interval_sum', aggcombinefn => 'interval_avg_combine',
+  aggserialfn => 'interval_avg_serialize',
+  aggdeserialfn => 'interval_avg_deserialize',
   aggmtransfn => 'interval_avg_accum', aggminvtransfn => 'interval_accum_inv',
   aggmfinalfn => 'interval_sum', aggtranstype => 'internal',
   aggmtranstype => 'internal', aggtransspace => '128',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f8eca58d..486573d6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4943,6 +4943,12 @@
 { oid => '3571', descr => 'aggregate transition function',
   proname => 'int4_avg_accum_inv', prorettype => '_int8',
   proargtypes => '_int8 int4', prosrc => 'int4_avg_accum_inv' },
+{ oid => '3813', descr => 'aggregate serial function',
+  proname => 'interval_avg_serialize', prorettype => 'bytea',
+  proargtypes => 'internal', prosrc => 'interval_avg_serialize' },
+{ oid => '3814', descr => 'aggregate deserial function',
+  proname => 'interval_avg_deserialize', prorettype => 'internal',
+  proargtypes => 'bytea internal', prosrc => 'interval_avg_deserialize' },
 { oid => '1964', descr => 'aggregate final function',
   proname => 'int8_avg', prorettype => 'numeric', proargtypes => '_int8',
   prosrc => 'int8_avg' },
-- 
2.34.1

Reply via email to