On Fri, 2 Jul 2021 at 10:24, David Rowley <dgrowle...@gmail.com> wrote:
>
> I ran this again with a few different worker counts after tuning a few
> memory settings so there was no spilling to disk and so everything was
> in RAM. Mostly so I could get consistent results.
>
> Here's the results. Average over 3 runs on each:
>
> Workers Master Patched Percent
> 8            11094.1 11084.9 100.08%
> 16           8711.4  8562.6   101.74%
> 32           6961.4  6726.3  103.50%
> 64           6137.4  5854.8  104.83%
> 128         6090.3  5747.4  105.96%
>

Thanks for testing again. Those are nice looking results, and are much
more in line with what I was seeing.

> So the gains are much less at lower worker counts.  I think this is
> because most of the gains are in the serial part of the plan and with
> higher worker counts that part of the plan is relatively much bigger.
>
> So likely performance isn't too critical here, but it is something to
> keep in mind.
>

Yes, agreed. I suspect there's not much more that can be shaved off
this particular piece of code now though. Here's an update with the
last set of changes discussed.

Regards,
Dean
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
new file mode 100644
index eb78f0b..a8c6bbf
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -515,6 +515,9 @@ static void set_var_from_var(const Numer
 static char *get_str_from_var(const NumericVar *var);
 static char *get_str_from_var_sci(const NumericVar *var, int rscale);
 
+static void numericvar_serialize(StringInfo buf, const NumericVar *var);
+static void numericvar_deserialize(StringInfo buf, NumericVar *var);
+
 static Numeric duplicate_numeric(Numeric num);
 static Numeric make_result(const NumericVar *var);
 static Numeric make_result_opt_error(const NumericVar *var, bool *error);
@@ -4943,38 +4946,25 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
 {
 	NumericAggState *state;
 	StringInfoData buf;
-	Datum		temp;
-	bytea	   *sumX;
 	bytea	   *result;
 	NumericVar	tmp_var;
 
+	init_var(&tmp_var);
+
 	/* Ensure we disallow calling when not in aggregate context */
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
 
 	state = (NumericAggState *) PG_GETARG_POINTER(0);
 
-	/*
-	 * This is a little wasteful since make_result converts the NumericVar
-	 * into a Numeric and numeric_send converts it back again. Is it worth
-	 * splitting the tasks in numeric_send into separate functions to stop
-	 * this? Doing so would also remove the fmgr call overhead.
-	 */
-	init_var(&tmp_var);
-	accum_sum_final(&state->sumX, &tmp_var);
-
-	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&tmp_var)));
-	sumX = DatumGetByteaPP(temp);
-	free_var(&tmp_var);
-
 	pq_begintypsend(&buf);
 
 	/* N */
 	pq_sendint64(&buf, state->N);
 
 	/* sumX */
-	pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+	accum_sum_final(&state->sumX, &tmp_var);
+	numericvar_serialize(&buf, &tmp_var);
 
 	/* maxScale */
 	pq_sendint32(&buf, state->maxScale);
@@ -4993,6 +4983,8 @@ numeric_avg_serialize(PG_FUNCTION_ARGS)
 
 	result = pq_endtypsend(&buf);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_BYTEA_P(result);
 }
 
@@ -5006,9 +4998,10 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS
 {
 	bytea	   *sstate;
 	NumericAggState *result;
-	Datum		temp;
-	NumericVar	tmp_var;
 	StringInfoData buf;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5029,11 +5022,7 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS
 	result->N = pq_getmsgint64(&buf);
 
 	/* sumX */
-	temp = DirectFunctionCall3(numeric_recv,
-							   PointerGetDatum(&buf),
-							   ObjectIdGetDatum(InvalidOid),
-							   Int32GetDatum(-1));
-	init_var_from_num(DatumGetNumeric(temp), &tmp_var);
+	numericvar_deserialize(&buf, &tmp_var);
 	accum_sum_add(&(result->sumX), &tmp_var);
 
 	/* maxScale */
@@ -5054,6 +5043,8 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS
 	pq_getmsgend(&buf);
 	pfree(buf.data);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_POINTER(result);
 }
 
@@ -5067,11 +5058,10 @@ numeric_serialize(PG_FUNCTION_ARGS)
 {
 	NumericAggState *state;
 	StringInfoData buf;
-	Datum		temp;
-	bytea	   *sumX;
-	NumericVar	tmp_var;
-	bytea	   *sumX2;
 	bytea	   *result;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	/* Ensure we disallow calling when not in aggregate context */
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -5079,36 +5069,18 @@ numeric_serialize(PG_FUNCTION_ARGS)
 
 	state = (NumericAggState *) PG_GETARG_POINTER(0);
 
-	/*
-	 * This is a little wasteful since make_result converts the NumericVar
-	 * into a Numeric and numeric_send converts it back again. Is it worth
-	 * splitting the tasks in numeric_send into separate functions to stop
-	 * this? Doing so would also remove the fmgr call overhead.
-	 */
-	init_var(&tmp_var);
-
-	accum_sum_final(&state->sumX, &tmp_var);
-	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&tmp_var)));
-	sumX = DatumGetByteaPP(temp);
-
-	accum_sum_final(&state->sumX2, &tmp_var);
-	temp = DirectFunctionCall1(numeric_send,
-							   NumericGetDatum(make_result(&tmp_var)));
-	sumX2 = DatumGetByteaPP(temp);
-
-	free_var(&tmp_var);
-
 	pq_begintypsend(&buf);
 
 	/* N */
 	pq_sendint64(&buf, state->N);
 
 	/* sumX */
-	pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+	accum_sum_final(&state->sumX, &tmp_var);
+	numericvar_serialize(&buf, &tmp_var);
 
 	/* sumX2 */
-	pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2));
+	accum_sum_final(&state->sumX2, &tmp_var);
+	numericvar_serialize(&buf, &tmp_var);
 
 	/* maxScale */
 	pq_sendint32(&buf, state->maxScale);
@@ -5127,6 +5099,8 @@ numeric_serialize(PG_FUNCTION_ARGS)
 
 	result = pq_endtypsend(&buf);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_BYTEA_P(result);
 }
 
@@ -5140,10 +5114,10 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 {
 	bytea	   *sstate;
 	NumericAggState *result;
-	Datum		temp;
-	NumericVar	sumX_var;
-	NumericVar	sumX2_var;
 	StringInfoData buf;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5164,20 +5138,12 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 	result->N = pq_getmsgint64(&buf);
 
 	/* sumX */
-	temp = DirectFunctionCall3(numeric_recv,
-							   PointerGetDatum(&buf),
-							   ObjectIdGetDatum(InvalidOid),
-							   Int32GetDatum(-1));
-	init_var_from_num(DatumGetNumeric(temp), &sumX_var);
-	accum_sum_add(&(result->sumX), &sumX_var);
+	numericvar_deserialize(&buf, &tmp_var);
+	accum_sum_add(&(result->sumX), &tmp_var);
 
 	/* sumX2 */
-	temp = DirectFunctionCall3(numeric_recv,
-							   PointerGetDatum(&buf),
-							   ObjectIdGetDatum(InvalidOid),
-							   Int32GetDatum(-1));
-	init_var_from_num(DatumGetNumeric(temp), &sumX2_var);
-	accum_sum_add(&(result->sumX2), &sumX2_var);
+	numericvar_deserialize(&buf, &tmp_var);
+	accum_sum_add(&(result->sumX2), &tmp_var);
 
 	/* maxScale */
 	result->maxScale = pq_getmsgint(&buf, 4);
@@ -5197,6 +5163,8 @@ numeric_deserialize(PG_FUNCTION_ARGS)
 	pq_getmsgend(&buf);
 	pfree(buf.data);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_POINTER(result);
 }
 
@@ -5459,9 +5427,10 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
 {
 	PolyNumAggState *state;
 	StringInfoData buf;
-	bytea	   *sumX;
-	bytea	   *sumX2;
 	bytea	   *result;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	/* Ensure we disallow calling when not in aggregate context */
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -5477,32 +5446,6 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
 	 * day we might like to send these over to another server for further
 	 * processing and we want a standard format to work with.
 	 */
-	{
-		Datum		temp;
-		NumericVar	num;
-
-		init_var(&num);
-
-#ifdef HAVE_INT128
-		int128_to_numericvar(state->sumX, &num);
-#else
-		accum_sum_final(&state->sumX, &num);
-#endif
-		temp = DirectFunctionCall1(numeric_send,
-								   NumericGetDatum(make_result(&num)));
-		sumX = DatumGetByteaPP(temp);
-
-#ifdef HAVE_INT128
-		int128_to_numericvar(state->sumX2, &num);
-#else
-		accum_sum_final(&state->sumX2, &num);
-#endif
-		temp = DirectFunctionCall1(numeric_send,
-								   NumericGetDatum(make_result(&num)));
-		sumX2 = DatumGetByteaPP(temp);
-
-		free_var(&num);
-	}
 
 	pq_begintypsend(&buf);
 
@@ -5510,13 +5453,25 @@ numeric_poly_serialize(PG_FUNCTION_ARGS)
 	pq_sendint64(&buf, state->N);
 
 	/* sumX */
-	pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+#ifdef HAVE_INT128
+	int128_to_numericvar(state->sumX, &tmp_var);
+#else
+	accum_sum_final(&state->sumX, &tmp_var);
+#endif
+	numericvar_serialize(&buf, &tmp_var);
 
 	/* sumX2 */
-	pq_sendbytes(&buf, VARDATA_ANY(sumX2), VARSIZE_ANY_EXHDR(sumX2));
+#ifdef HAVE_INT128
+	int128_to_numericvar(state->sumX2, &tmp_var);
+#else
+	accum_sum_final(&state->sumX2, &tmp_var);
+#endif
+	numericvar_serialize(&buf, &tmp_var);
 
 	result = pq_endtypsend(&buf);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_BYTEA_P(result);
 }
 
@@ -5530,11 +5485,10 @@ numeric_poly_deserialize(PG_FUNCTION_ARG
 {
 	bytea	   *sstate;
 	PolyNumAggState *result;
-	Datum		sumX;
-	NumericVar	sumX_var;
-	Datum		sumX2;
-	NumericVar	sumX2_var;
 	StringInfoData buf;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5555,34 +5509,26 @@ numeric_poly_deserialize(PG_FUNCTION_ARG
 	result->N = pq_getmsgint64(&buf);
 
 	/* sumX */
-	sumX = DirectFunctionCall3(numeric_recv,
-							   PointerGetDatum(&buf),
-							   ObjectIdGetDatum(InvalidOid),
-							   Int32GetDatum(-1));
-
-	/* sumX2 */
-	sumX2 = DirectFunctionCall3(numeric_recv,
-								PointerGetDatum(&buf),
-								ObjectIdGetDatum(InvalidOid),
-								Int32GetDatum(-1));
-
-	init_var_from_num(DatumGetNumeric(sumX), &sumX_var);
+	numericvar_deserialize(&buf, &tmp_var);
 #ifdef HAVE_INT128
-	numericvar_to_int128(&sumX_var, &result->sumX);
+	numericvar_to_int128(&tmp_var, &result->sumX);
 #else
-	accum_sum_add(&result->sumX, &sumX_var);
+	accum_sum_add(&result->sumX, &tmp_var);
 #endif
 
-	init_var_from_num(DatumGetNumeric(sumX2), &sumX2_var);
+	/* sumX2 */
+	numericvar_deserialize(&buf, &tmp_var);
 #ifdef HAVE_INT128
-	numericvar_to_int128(&sumX2_var, &result->sumX2);
+	numericvar_to_int128(&tmp_var, &result->sumX2);
 #else
-	accum_sum_add(&result->sumX2, &sumX2_var);
+	accum_sum_add(&result->sumX2, &tmp_var);
 #endif
 
 	pq_getmsgend(&buf);
 	pfree(buf.data);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_POINTER(result);
 }
 
@@ -5681,8 +5627,10 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
 {
 	PolyNumAggState *state;
 	StringInfoData buf;
-	bytea	   *sumX;
 	bytea	   *result;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	/* Ensure we disallow calling when not in aggregate context */
 	if (!AggCheckCallContext(fcinfo, NULL))
@@ -5698,23 +5646,6 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
 	 * like to send these over to another server for further processing and we
 	 * want a standard format to work with.
 	 */
-	{
-		Datum		temp;
-		NumericVar	num;
-
-		init_var(&num);
-
-#ifdef HAVE_INT128
-		int128_to_numericvar(state->sumX, &num);
-#else
-		accum_sum_final(&state->sumX, &num);
-#endif
-		temp = DirectFunctionCall1(numeric_send,
-								   NumericGetDatum(make_result(&num)));
-		sumX = DatumGetByteaPP(temp);
-
-		free_var(&num);
-	}
 
 	pq_begintypsend(&buf);
 
@@ -5722,10 +5653,17 @@ int8_avg_serialize(PG_FUNCTION_ARGS)
 	pq_sendint64(&buf, state->N);
 
 	/* sumX */
-	pq_sendbytes(&buf, VARDATA_ANY(sumX), VARSIZE_ANY_EXHDR(sumX));
+#ifdef HAVE_INT128
+	int128_to_numericvar(state->sumX, &tmp_var);
+#else
+	accum_sum_final(&state->sumX, &tmp_var);
+#endif
+	numericvar_serialize(&buf, &tmp_var);
 
 	result = pq_endtypsend(&buf);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_BYTEA_P(result);
 }
 
@@ -5739,8 +5677,9 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 	bytea	   *sstate;
 	PolyNumAggState *result;
 	StringInfoData buf;
-	Datum		temp;
-	NumericVar	num;
+	NumericVar	tmp_var;
+
+	init_var(&tmp_var);
 
 	if (!AggCheckCallContext(fcinfo, NULL))
 		elog(ERROR, "aggregate function called in non-aggregate context");
@@ -5761,20 +5700,18 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 	result->N = pq_getmsgint64(&buf);
 
 	/* sumX */
-	temp = DirectFunctionCall3(numeric_recv,
-							   PointerGetDatum(&buf),
-							   ObjectIdGetDatum(InvalidOid),
-							   Int32GetDatum(-1));
-	init_var_from_num(DatumGetNumeric(temp), &num);
+	numericvar_deserialize(&buf, &tmp_var);
 #ifdef HAVE_INT128
-	numericvar_to_int128(&num, &result->sumX);
+	numericvar_to_int128(&tmp_var, &result->sumX);
 #else
-	accum_sum_add(&result->sumX, &num);
+	accum_sum_add(&result->sumX, &tmp_var);
 #endif
 
 	pq_getmsgend(&buf);
 	pfree(buf.data);
 
+	free_var(&tmp_var);
+
 	PG_RETURN_POINTER(result);
 }
 
@@ -7286,6 +7223,48 @@ get_str_from_var_sci(const NumericVar *v
 }
 
 
+/*
+ * numericvar_serialize - serialize NumericVar to binary format
+ *
+ * At variable level, no checks are performed on the weight or dscale, allowing
+ * us to pass around intermediate values with higher precision than supported
+ * by the numeric type.  Note: this is incompatible with numeric_send/recv(),
+ * which use 16-bit integers for these fields.
+ */
+static void
+numericvar_serialize(StringInfo buf, const NumericVar *var)
+{
+	int			i;
+
+	pq_sendint32(buf, var->ndigits);
+	pq_sendint32(buf, var->weight);
+	pq_sendint32(buf, var->sign);
+	pq_sendint32(buf, var->dscale);
+	for (i = 0; i < var->ndigits; i++)
+		pq_sendint16(buf, var->digits[i]);
+}
+
+/*
+ * numericvar_deserialize - deserialize binary format to NumericVar
+ */
+static void
+numericvar_deserialize(StringInfo buf, NumericVar *var)
+{
+	int			len,
+				i;
+
+	len = pq_getmsgint(buf, sizeof(int32));
+
+	alloc_var(var, len);		/* sets var->ndigits */
+
+	var->weight = pq_getmsgint(buf, sizeof(int32));
+	var->sign = pq_getmsgint(buf, sizeof(int32));
+	var->dscale = pq_getmsgint(buf, sizeof(int32));
+	for (i = 0; i < len; i++)
+		var->digits[i] = pq_getmsgint(buf, sizeof(int16));
+}
+
+
 /*
  * duplicate_numeric() - copy a packed-format Numeric
  *
diff --git a/src/test/regress/expected/numeric.out b/src/test/regress/expected/numeric.out
new file mode 100644
index 30a5642..4ad4851
--- a/src/test/regress/expected/numeric.out
+++ b/src/test/regress/expected/numeric.out
@@ -2967,6 +2967,56 @@ SELECT SUM((-9999)::numeric) FROM genera
 (1 row)
 
 --
+-- Tests for VARIANCE()
+--
+CREATE TABLE num_variance (a numeric);
+INSERT INTO num_variance VALUES (0);
+INSERT INTO num_variance VALUES (3e-500);
+INSERT INTO num_variance VALUES (-3e-500);
+INSERT INTO num_variance VALUES (4e-500 - 1e-16383);
+INSERT INTO num_variance VALUES (-4e-500 + 1e-16383);
+-- variance is just under 12.5e-1000 and so should round down to 12e-1000
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+ trim_scale 
+------------
+         12
+(1 row)
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+ trim_scale 
+------------
+         12
+(1 row)
+
+ROLLBACK;
+-- case where sum of squares would overflow but variance does not
+DELETE FROM num_variance;
+INSERT INTO num_variance SELECT 9e131071 + x FROM generate_series(1, 5) x;
+SELECT variance(a) FROM num_variance;
+      variance      
+--------------------
+ 2.5000000000000000
+(1 row)
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT variance(a) FROM num_variance;
+      variance      
+--------------------
+ 2.5000000000000000
+(1 row)
+
+ROLLBACK;
+DROP TABLE num_variance;
+--
 -- Tests for GCD()
 --
 SELECT a, b, gcd(a, b), gcd(a, -b), gcd(-b, a), gcd(-b, -a)
diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql
new file mode 100644
index db812c8..3784c52
--- a/src/test/regress/sql/numeric.sql
+++ b/src/test/regress/sql/numeric.sql
@@ -1278,6 +1278,42 @@ SELECT SUM(9999::numeric) FROM generate_
 SELECT SUM((-9999)::numeric) FROM generate_series(1, 100000);
 
 --
+-- Tests for VARIANCE()
+--
+CREATE TABLE num_variance (a numeric);
+INSERT INTO num_variance VALUES (0);
+INSERT INTO num_variance VALUES (3e-500);
+INSERT INTO num_variance VALUES (-3e-500);
+INSERT INTO num_variance VALUES (4e-500 - 1e-16383);
+INSERT INTO num_variance VALUES (-4e-500 + 1e-16383);
+
+-- variance is just under 12.5e-1000 and so should round down to 12e-1000
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT trim_scale(variance(a) * 1e1000) FROM num_variance;
+ROLLBACK;
+
+-- case where sum of squares would overflow but variance does not
+DELETE FROM num_variance;
+INSERT INTO num_variance SELECT 9e131071 + x FROM generate_series(1, 5) x;
+SELECT variance(a) FROM num_variance;
+
+-- check that parallel execution produces the same result
+BEGIN;
+ALTER TABLE num_variance SET (parallel_workers = 4);
+SET LOCAL parallel_setup_cost = 0;
+SET LOCAL max_parallel_workers_per_gather = 4;
+SELECT variance(a) FROM num_variance;
+ROLLBACK;
+
+DROP TABLE num_variance;
+
+--
 -- Tests for GCD()
 --
 SELECT a, b, gcd(a, b), gcd(a, -b), gcd(-b, a), gcd(-b, -a)

Reply via email to