On Fri, 2 Jul 2021 at 10:24, David Rowley <dgrowle...@gmail.com> wrote: > > I ran this again with a few different worker counts after tuning a few > memory settings so there was no spilling to disk and so everything was > in RAM. Mostly so I could get consistent results. > > Here's the results. Average over 3 runs on each: > > Workers Master Patched Percent > 8 11094.1 11084.9 100.08% > 16 8711.4 8562.6 101.74% > 32 6961.4 6726.3 103.50% > 64 6137.4 5854.8 104.83% > 128 6090.3 5747.4 105.96% >
Thanks for testing again. Those are nice looking results, and are much more in line with what I was seeing. > So the gains are much less at lower worker counts. I think this is > because most of the gains are in the serial part of the plan and with > higher worker counts that part of the plan is relatively much bigger. > > So likely performance isn't too critical here, but it is something to > keep in mind. > Yes, agreed. I suspect there's not much more that can be shaved off this particular piece of code now though. Here's an update with the last set of changes discussed. Regards, Dean
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c new file mode 100644 index eb78f0b..a8c6bbf --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -515,6 +515,9 @@ static void set_var_from_var(const Numer static char *get_str_from_var(const NumericVar *var); static char *get_str_from_var_sci(const NumericVar *var, int rscale); +static void numericvar_serialize(StringInfo buf, const NumericVar *var); +static void numericvar_deserialize(StringInfo buf, NumericVar *var); + static Numeric duplicate_numeric(Numeric num); static Numeric make_result(const NumericVar *var); static Numeric make_result_opt_error(const NumericVar *var, bool *error); @@ -4943,38 +4946,25 @@ numeric_avg_serialize(PG_FUNCTION_ARGS) { NumericAggState *state; StringInfoData buf; - Datum temp; - bytea *sumX; bytea *result; NumericVar tmp_var; + init_var(&tmp_var); + /* Ensure we disallow calling when not in aggregate context */ if (!AggCheckCallContext(fcinfo, NULL)) elog(ERROR, "aggregate function called in non-aggregate context"); state = (NumericAggState *) PG_GETARG_POINTER(0); - /* - * This is a little wasteful since make_result converts the NumericVar - * into a Numeric and numeric_send converts it back again. Is it worth - * splitting the tasks in numeric_send into separate functions to stop - * this? Doing so would also remove the fmgr call overhead. - */ - init_var(&tmp_var); - accum_sum_final(&state->sumX, &tmp_var); - - temp = DirectFunctionCall1(numeric_send, - NumericGetDatum(make_result(&tmp_var))); - sumX = DatumGetByteaPP(temp); - free_var(&tmp_var); - pq_begintypsend(&buf); /* N */ pq_sendint64(&buf, state->N); /* sumX */ - pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX)); + accum_sum_final(&state->sumX, &tmp_var); + numericvar_serialize(&buf, &tmp_var); /* maxScale */ pq_sendint32(&buf, state->maxScale); @@ -4993,6 +4983,8 @@ numeric_avg_serialize(PG_FUNCTION_ARGS) result = pq_endtypsend(&buf); + free_var(&tmp_var); + PG_RETURN_BYTEA_P(result); } @@ -5006,9 +4998,10 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS { bytea *sstate; NumericAggState *result; - Datum temp; - NumericVar tmp_var; StringInfoData buf; + NumericVar tmp_var; + + init_var(&tmp_var); if (!AggCheckCallContext(fcinfo, NULL)) elog(ERROR, "aggregate function called in non-aggregate context"); @@ -5029,11 +5022,7 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS result->N = pq_getmsgint64(&buf); /* sumX */ - temp = DirectFunctionCall3(numeric_recv, - PointerGetDatum(&buf), - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - init_var_from_num(DatumGetNumeric(temp), &tmp_var); + numericvar_deserialize(&buf, &tmp_var); accum_sum_add(&(result->sumX), &tmp_var); /* maxScale */ @@ -5054,6 +5043,8 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS pq_getmsgend(&buf); pfree(buf.data); + free_var(&tmp_var); + PG_RETURN_POINTER(result); } @@ -5067,11 +5058,10 @@ numeric_serialize(PG_FUNCTION_ARGS) { NumericAggState *state; StringInfoData buf; - Datum temp; - bytea *sumX; - NumericVar tmp_var; - bytea *sumX2; bytea *result; + NumericVar tmp_var; + + init_var(&tmp_var); /* Ensure we disallow calling when not in aggregate context */ if (!AggCheckCallContext(fcinfo, NULL)) @@ -5079,36 +5069,18 @@ numeric_serialize(PG_FUNCTION_ARGS) state = (NumericAggState *) PG_GETARG_POINTER(0); - /* - * This is a little wasteful since make_result converts the NumericVar - * into a Numeric and numeric_send converts it back again. Is it worth - * splitting the tasks in numeric_send into separate functions to stop - * this? Doing so would also remove the fmgr call overhead. - */ - init_var(&tmp_var); - - accum_sum_final(&state->sumX, &tmp_var); - temp = DirectFunctionCall1(numeric_send, - NumericGetDatum(make_result(&tmp_var))); - sumX = DatumGetByteaPP(temp); - - accum_sum_final(&state->sumX2, &tmp_var); - temp = DirectFunctionCall1(numeric_send, - NumericGetDatum(make_result(&tmp_var))); - sumX2 = DatumGetByteaPP(temp); - - free_var(&tmp_var); - pq_begintypsend(&buf); /* N */ pq_sendint64(&buf, state->N); /* sumX */ - pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX)); + accum_sum_final(&state->sumX, &tmp_var); + numericvar_serialize(&buf, &tmp_var); /* sumX2 */ - pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2)); + accum_sum_final(&state->sumX2, &tmp_var); + numericvar_serialize(&buf, &tmp_var); /* maxScale */ pq_sendint32(&buf, state->maxScale); @@ -5127,6 +5099,8 @@ numeric_serialize(PG_FUNCTION_ARGS) result = pq_endtypsend(&buf); + free_var(&tmp_var); + PG_RETURN_BYTEA_P(result); } @@ -5140,10 +5114,10 @@ numeric_deserialize(PG_FUNCTION_ARGS) { bytea *sstate; NumericAggState *result; - Datum temp; - NumericVar sumX_var; - NumericVar sumX2_var; StringInfoData buf; + NumericVar tmp_var; + + init_var(&tmp_var); if (!AggCheckCallContext(fcinfo, NULL)) elog(ERROR, "aggregate function called in non-aggregate context"); @@ -5164,20 +5138,12 @@ numeric_deserialize(PG_FUNCTION_ARGS) result->N = pq_getmsgint64(&buf); /* sumX */ - temp = DirectFunctionCall3(numeric_recv, - PointerGetDatum(&buf), - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - init_var_from_num(DatumGetNumeric(temp), &sumX_var); - accum_sum_add(&(result->sumX), &sumX_var); + numericvar_deserialize(&buf, &tmp_var); + accum_sum_add(&(result->sumX), &tmp_var); /* sumX2 */ - temp = DirectFunctionCall3(numeric_recv, - PointerGetDatum(&buf), - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - init_var_from_num(DatumGetNumeric(temp), &sumX2_var); - accum_sum_add(&(result->sumX2), &sumX2_var); + numericvar_deserialize(&buf, &tmp_var); + accum_sum_add(&(result->sumX2), &tmp_var); /* maxScale */ result->maxScale = pq_getmsgint(&buf, 4); @@ -5197,6 +5163,8 @@ numeric_deserialize(PG_FUNCTION_ARGS) pq_getmsgend(&buf); pfree(buf.data); + free_var(&tmp_var); + PG_RETURN_POINTER(result); } @@ -5459,9 +5427,10 @@ numeric_poly_serialize(PG_FUNCTION_ARGS) { PolyNumAggState *state; StringInfoData buf; - bytea *sumX; - bytea *sumX2; bytea *result; + NumericVar tmp_var; + + init_var(&tmp_var); /* Ensure we disallow calling when not in aggregate context */ if (!AggCheckCallContext(fcinfo, NULL)) @@ -5477,32 +5446,6 @@ numeric_poly_serialize(PG_FUNCTION_ARGS) * day we might like to send these over to another server for further * processing and we want a standard format to work with. */ - { - Datum temp; - NumericVar num; - - init_var(&num); - -#ifdef HAVE_INT128 - int128_to_numericvar(state->sumX, &num); -#else - accum_sum_final(&state->sumX, &num); -#endif - temp = DirectFunctionCall1(numeric_send, - NumericGetDatum(make_result(&num))); - sumX = DatumGetByteaPP(temp); - -#ifdef HAVE_INT128 - int128_to_numericvar(state->sumX2, &num); -#else - accum_sum_final(&state->sumX2, &num); -#endif - temp = DirectFunctionCall1(numeric_send, - NumericGetDatum(make_result(&num))); - sumX2 = DatumGetByteaPP(temp); - - free_var(&num); - } pq_begintypsend(&buf); @@ -5510,13 +5453,25 @@ numeric_poly_serialize(PG_FUNCTION_ARGS) pq_sendint64(&buf, state->N); /* sumX */ - pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX)); +#ifdef HAVE_INT128 + int128_to_numericvar(state->sumX, &tmp_var); +#else + accum_sum_final(&state->sumX, &tmp_var); +#endif + numericvar_serialize(&buf, &tmp_var); /* sumX2 */ - pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2)); +#ifdef HAVE_INT128 + int128_to_numericvar(state->sumX2, &tmp_var); +#else + accum_sum_final(&state->sumX2, &tmp_var); +#endif + numericvar_serialize(&buf, &tmp_var); result = pq_endtypsend(&buf); + free_var(&tmp_var); + PG_RETURN_BYTEA_P(result); } @@ -5530,11 +5485,10 @@ numeric_poly_deserialize(PG_FUNCTION_ARG { bytea *sstate; PolyNumAggState *result; - Datum sumX; - NumericVar sumX_var; - Datum sumX2; - NumericVar sumX2_var; StringInfoData buf; + NumericVar tmp_var; + + init_var(&tmp_var); if (!AggCheckCallContext(fcinfo, NULL)) elog(ERROR, "aggregate function called in non-aggregate context"); @@ -5555,34 +5509,26 @@ numeric_poly_deserialize(PG_FUNCTION_ARG result->N = pq_getmsgint64(&buf); /* sumX */ - sumX = DirectFunctionCall3(numeric_recv, - PointerGetDatum(&buf), - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - - /* sumX2 */ - sumX2 = DirectFunctionCall3(numeric_recv, - PointerGetDatum(&buf), - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - - init_var_from_num(DatumGetNumeric(sumX), &sumX_var); + numericvar_deserialize(&buf, &tmp_var); #ifdef HAVE_INT128 - numericvar_to_int128(&sumX_var, &result->sumX); + numericvar_to_int128(&tmp_var, &result->sumX); #else - accum_sum_add(&result->sumX, &sumX_var); + accum_sum_add(&result->sumX, &tmp_var); #endif - init_var_from_num(DatumGetNumeric(sumX2), &sumX2_var); + /* sumX2 */ + numericvar_deserialize(&buf, &tmp_var); #ifdef HAVE_INT128 - numericvar_to_int128(&sumX2_var, &result->sumX2); + numericvar_to_int128(&tmp_var, &result->sumX2); #else - accum_sum_add(&result->sumX2, &sumX2_var); + accum_sum_add(&result->sumX2, &tmp_var); #endif pq_getmsgend(&buf); pfree(buf.data); + free_var(&tmp_var); + PG_RETURN_POINTER(result); } @@ -5681,8 +5627,10 @@ int8_avg_serialize(PG_FUNCTION_ARGS) { PolyNumAggState *state; StringInfoData buf; - bytea *sumX; bytea *result; + NumericVar tmp_var; + + init_var(&tmp_var); /* Ensure we disallow calling when not in aggregate context */ if (!AggCheckCallContext(fcinfo, NULL)) @@ -5698,23 +5646,6 @@ int8_avg_serialize(PG_FUNCTION_ARGS) * like to send these over to another server for further processing and we * want a standard format to work with. */ - { - Datum temp; - NumericVar num; - - init_var(&num); - -#ifdef HAVE_INT128 - int128_to_numericvar(state->sumX, &num); -#else - accum_sum_final(&state->sumX, &num); -#endif - temp = DirectFunctionCall1(numeric_send, - NumericGetDatum(make_result(&num))); - sumX = DatumGetByteaPP(temp); - - free_var(&num); - } pq_begintypsend(&buf); @@ -5722,10 +5653,17 @@ int8_avg_serialize(PG_FUNCTION_ARGS) pq_sendint64(&buf, state->N); /* sumX */ - pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX)); +#ifdef HAVE_INT128 + int128_to_numericvar(state->sumX, &tmp_var); +#else + accum_sum_final(&state->sumX, &tmp_var); +#endif + numericvar_serialize(&buf, &tmp_var); result = pq_endtypsend(&buf); + free_var(&tmp_var); + PG_RETURN_BYTEA_P(result); } @@ -5739,8 +5677,9 @@ int8_avg_deserialize(PG_FUNCTION_ARGS) bytea *sstate; PolyNumAggState *result; StringInfoData buf; - Datum temp; - NumericVar num; + NumericVar tmp_var; + + init_var(&tmp_var); if (!AggCheckCallContext(fcinfo, NULL)) elog(ERROR, "aggregate function called in non-aggregate context"); @@ -5761,20 +5700,18 @@ int8_avg_deserialize(PG_FUNCTION_ARGS) result->N = pq_getmsgint64(&buf); /* sumX */ - temp = DirectFunctionCall3(numeric_recv, - PointerGetDatum(&buf), - ObjectIdGetDatum(InvalidOid), - Int32GetDatum(-1)); - init_var_from_num(DatumGetNumeric(temp), &num); + numericvar_deserialize(&buf, &tmp_var); #ifdef HAVE_INT128 - numericvar_to_int128(&num, &result->sumX); + numericvar_to_int128(&tmp_var, &result->sumX); #else - accum_sum_add(&result->sumX, &num); + accum_sum_add(&result->sumX, &tmp_var); #endif pq_getmsgend(&buf); pfree(buf.data); + free_var(&tmp_var); + PG_RETURN_POINTER(result); } @@ -7286,6 +7223,48 @@ get_str_from_var_sci(const NumericVar *v } +/* + * numericvar_serialize - serialize NumericVar to binary format + * + * At variable level, no checks are performed on the weight or dscale, allowing + * us to pass around intermediate values with higher precision than supported + * by the numeric type. Note: this is incompatible with numeric_send/recv(), + * which use 16-bit integers for these fields. + */ +static void +numericvar_serialize(StringInfo buf, const NumericVar *var) +{ + int i; + + pq_sendint32(buf, var->ndigits); + pq_sendint32(buf, var->weight); + pq_sendint32(buf, var->sign); + pq_sendint32(buf, var->dscale); + for (i = 0; i < var->ndigits; i++) + pq_sendint16(buf, var->digits[i]); +} + +/* + * numericvar_deserialize - deserialize binary format to NumericVar + */ +static void +numericvar_deserialize(StringInfo buf, NumericVar *var) +{ + int len, + i; + + len = pq_getmsgint(buf, sizeof(int32)); + + alloc_var(var, len); /* sets var->ndigits */ + + var->weight = pq_getmsgint(buf, sizeof(int32)); + var->sign = pq_getmsgint(buf, sizeof(int32)); + var->dscale = pq_getmsgint(buf, sizeof(int32)); + for (i = 0; i < len; i++) + var->digits[i] = pq_getmsgint(buf, sizeof(int16)); +} + + /* * duplicate_numeric() - copy a packed-format Numeric * diff --git a/src/test/regress/expected/numeric.out b/src/test/regress/expected/numeric.out new file mode 100644 index 30a5642..4ad4851 --- a/src/test/regress/expected/numeric.out +++ b/src/test/regress/expected/numeric.out @@ -2967,6 +2967,56 @@ SELECT SUM((-9999)::numeric) FROM genera (1 row) -- +-- Tests for VARIANCE() +-- +CREATE TABLE num_variance (a numeric); +INSERT INTO num_variance VALUES (0); +INSERT INTO num_variance VALUES (3e-500); +INSERT INTO num_variance VALUES (-3e-500); +INSERT INTO num_variance VALUES (4e-500 - 1e-16383); +INSERT INTO num_variance VALUES (-4e-500 + 1e-16383); +-- variance is just under 12.5e-1000 and so should round down to 12e-1000 +SELECT trim_scale(variance(a) * 1e1000) FROM num_variance; + trim_scale +------------ + 12 +(1 row) + +-- check that parallel execution produces the same result +BEGIN; +ALTER TABLE num_variance SET (parallel_workers = 4); +SET LOCAL parallel_setup_cost = 0; +SET LOCAL max_parallel_workers_per_gather = 4; +SELECT trim_scale(variance(a) * 1e1000) FROM num_variance; + trim_scale +------------ + 12 +(1 row) + +ROLLBACK; +-- case where sum of squares would overflow but variance does not +DELETE FROM num_variance; +INSERT INTO num_variance SELECT 9e131071 + x FROM generate_series(1, 5) x; +SELECT variance(a) FROM num_variance; + variance +-------------------- + 2.5000000000000000 +(1 row) + +-- check that parallel execution produces the same result +BEGIN; +ALTER TABLE num_variance SET (parallel_workers = 4); +SET LOCAL parallel_setup_cost = 0; +SET LOCAL max_parallel_workers_per_gather = 4; +SELECT variance(a) FROM num_variance; + variance +-------------------- + 2.5000000000000000 +(1 row) + +ROLLBACK; +DROP TABLE num_variance; +-- -- Tests for GCD() -- SELECT a, b, gcd(a, b), gcd(a, -b), gcd(-b, a), gcd(-b, -a) diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql new file mode 100644 index db812c8..3784c52 --- a/src/test/regress/sql/numeric.sql +++ b/src/test/regress/sql/numeric.sql @@ -1278,6 +1278,42 @@ SELECT SUM(9999::numeric) FROM generate_ SELECT SUM((-9999)::numeric) FROM generate_series(1, 100000); -- +-- Tests for VARIANCE() +-- +CREATE TABLE num_variance (a numeric); +INSERT INTO num_variance VALUES (0); +INSERT INTO num_variance VALUES (3e-500); +INSERT INTO num_variance VALUES (-3e-500); +INSERT INTO num_variance VALUES (4e-500 - 1e-16383); +INSERT INTO num_variance VALUES (-4e-500 + 1e-16383); + +-- variance is just under 12.5e-1000 and so should round down to 12e-1000 +SELECT trim_scale(variance(a) * 1e1000) FROM num_variance; + +-- check that parallel execution produces the same result +BEGIN; +ALTER TABLE num_variance SET (parallel_workers = 4); +SET LOCAL parallel_setup_cost = 0; +SET LOCAL max_parallel_workers_per_gather = 4; +SELECT trim_scale(variance(a) * 1e1000) FROM num_variance; +ROLLBACK; + +-- case where sum of squares would overflow but variance does not +DELETE FROM num_variance; +INSERT INTO num_variance SELECT 9e131071 + x FROM generate_series(1, 5) x; +SELECT variance(a) FROM num_variance; + +-- check that parallel execution produces the same result +BEGIN; +ALTER TABLE num_variance SET (parallel_workers = 4); +SET LOCAL parallel_setup_cost = 0; +SET LOCAL max_parallel_workers_per_gather = 4; +SELECT variance(a) FROM num_variance; +ROLLBACK; + +DROP TABLE num_variance; + +-- -- Tests for GCD() -- SELECT a, b, gcd(a, b), gcd(a, -b), gcd(-b, a), gcd(-b, -a)