> On Wed, 20 Sept 2023 at 15:00, Ashutosh Bapat > <ashutosh.bapat....@gmail.com> wrote: > > > > 0005 - Refactored Jian's code fixing window functions. Does not > > contain the changes for serialization and deserialization. Jian, > > please let me know if I have missed anything else. > >
attached serialization and deserialization function. > > Also, in do_interval_discard(), this seems a bit risky: > > + neg_val.day = -newval->day; > + neg_val.month = -newval->month; > + neg_val.time = -newval->time; > we already have interval negate function, So I changed to interval_um_internal. based on 20230920 patches. I have made the attached changes. The serialization do make big difference when configure to parallel mode.
From bff5e3dfa8607a8b45aa287a7c55fda9d984f339 Mon Sep 17 00:00:00 2001 From: pgaddict <jian.universal...@gmail.com> Date: Thu, 21 Sep 2023 10:05:21 +0800 Subject: [PATCH v22 7/7] interval aggregate serializatinn and deserialization add interval aggregate serialization and deserialization function. fix a typo. change manually negate a interval to use interval_um_internal. --- src/backend/utils/adt/timestamp.c | 86 +++++++++++++++++++++++++--- src/include/catalog/pg_aggregate.dat | 4 ++ src/include/catalog/pg_proc.dat | 6 ++ 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index c6dc2d44..2b86171a 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -75,7 +75,7 @@ typedef struct /* * The transition datatype for interval aggregates is declared as Internal. It's - * a pointer to a NumericAggState allocated in the aggregate context. + * a pointer to a IntervalAggState allocated in the aggregate context. */ typedef struct IntervalAggState { @@ -3984,10 +3984,7 @@ interval_avg_combine(PG_FUNCTION_ARGS) state1->N = state2->N; state1->pInfcount = state2->pInfcount; state1->nInfcount = state2->nInfcount; - - state1->sumX.day = state2->sumX.day; - state1->sumX.month = state2->sumX.month; - state1->sumX.time = state2->sumX.time; + memcpy(&state1->sumX, &state2->sumX, sizeof(Interval)); PG_RETURN_POINTER(state1); } @@ -4008,6 +4005,81 @@ interval_avg_combine(PG_FUNCTION_ARGS) PG_RETURN_POINTER(state1); } +/* + * interval_avg_serialize + * Serialize IntervalAggState for interval aggregates. + */ +Datum +interval_avg_serialize(PG_FUNCTION_ARGS) +{ + IntervalAggState *state; + StringInfoData buf; + bytea *result; + /* Ensure we disallow calling when not in aggregate context */ + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + state = (IntervalAggState *) PG_GETARG_POINTER(0); + pq_begintypsend(&buf); + /* N */ + pq_sendint64(&buf, state->N); + /* Interval struct elements, one by one. */ + pq_sendint64(&buf, state->sumX.time); + pq_sendint32(&buf, state->sumX.day); + pq_sendint32(&buf, state->sumX.month); + /* pInfcount */ + pq_sendint64(&buf, state->pInfcount); + /* nInfcount */ + pq_sendint64(&buf, state->nInfcount); + result = pq_endtypsend(&buf); + PG_RETURN_BYTEA_P(result); +} + +/* + * interval_avg_deserialize + * Deserialize bytea into IntervalAggState for interval aggregates. + */ +Datum +interval_avg_deserialize(PG_FUNCTION_ARGS) +{ + bytea *sstate; + IntervalAggState *result; + StringInfoData buf; + + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + result = makeIntervalAggState(fcinfo); + + /* N */ + result->N = pq_getmsgint64(&buf); + + /* Interval struct elements, one by one. */ + result->sumX.time = pq_getmsgint64(&buf); + result->sumX.day = pq_getmsgint(&buf, sizeof(result->sumX.day)); + result->sumX.month = pq_getmsgint(&buf, sizeof(result->sumX.month)); + + /* pInfcount */ + result->pInfcount = pq_getmsgint64(&buf); + + /* nInfcount */ + result->nInfcount = pq_getmsgint64(&buf); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); +} + /* * Remove the given interval value from the aggregated state. */ @@ -4034,9 +4106,7 @@ do_interval_discard(IntervalAggState *state, Interval *newval) Interval temp; Interval neg_val; - neg_val.day = -newval->day; - neg_val.month = -newval->month; - neg_val.time = -newval->time; + interval_um_internal(newval, &neg_val); memcpy(&temp, &state->sumX, sizeof(Interval)); finite_interval_pl(&state->sumX, &temp, &neg_val); diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat index e2087d7b..0e62c3f7 100644 --- a/src/include/catalog/pg_aggregate.dat +++ b/src/include/catalog/pg_aggregate.dat @@ -45,6 +45,8 @@ aggtranstype => '_float8', agginitval => '{0,0,0}' }, { aggfnoid => 'avg(interval)', aggtransfn => 'interval_avg_accum', aggfinalfn => 'interval_avg', aggcombinefn => 'interval_avg_combine', + aggserialfn => 'interval_avg_serialize', + aggdeserialfn => 'interval_avg_deserialize', aggmtransfn => 'interval_avg_accum', aggminvtransfn => 'interval_accum_inv', aggmfinalfn => 'interval_avg', aggtranstype => 'internal', aggmtranstype => 'internal', aggtransspace => '128', @@ -75,6 +77,8 @@ aggtranstype => 'money', aggmtranstype => 'money' }, { aggfnoid => 'sum(interval)', aggtransfn => 'interval_avg_accum', aggfinalfn => 'interval_sum', aggcombinefn => 'interval_avg_combine', + aggserialfn => 'interval_avg_serialize', + aggdeserialfn => 'interval_avg_deserialize', aggmtransfn => 'interval_avg_accum', aggminvtransfn => 'interval_accum_inv', aggmfinalfn => 'interval_sum', aggtranstype => 'internal', aggmtranstype => 'internal', aggtransspace => '128', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f8eca58d..486573d6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -4943,6 +4943,12 @@ { oid => '3571', descr => 'aggregate transition function', proname => 'int4_avg_accum_inv', prorettype => '_int8', proargtypes => '_int8 int4', prosrc => 'int4_avg_accum_inv' }, +{ oid => '3813', descr => 'aggregate serial function', + proname => 'interval_avg_serialize', prorettype => 'bytea', + proargtypes => 'internal', prosrc => 'interval_avg_serialize' }, +{ oid => '3814', descr => 'aggregate deserial function', + proname => 'interval_avg_deserialize', prorettype => 'internal', + proargtypes => 'bytea internal', prosrc => 'interval_avg_deserialize' }, { oid => '1964', descr => 'aggregate final function', proname => 'int8_avg', prorettype => 'numeric', proargtypes => '_int8', prosrc => 'int8_avg' }, -- 2.34.1