On Sat, 3 Sept 2022 at 00:37, Ranier Vilela <[email protected]> wrote:
>> But +1 to fix this and other issues even if they would never crash.
Yeah, I don't think any of this coding would lead to a crash, but it's
pretty weird coding and we should fix it.
> 1. Once that ranges->nranges is invariant, avoid the loop if ranges->nranges
> <= 0.
> This matches the current behavior.
>
> 2. Once that ranges->nsorted is invariant, avoid the loop if ranges->nsorted
> <= 0.
> This matches the current behavior.
>
> 3. Remove the invariant cxt from ranges->nsorted loop.
>
> 4. Avoid possible overflows when using int to store length strings.
>
> 5. Avoid possible out-of-bounds when ranges->nranges == 0.
>
> 6. Avoid overhead when using unnecessary StringInfoData to convert Datum a to
> Text b.
I've ripped out #4 and #6 for now. I think we should do #6 in master
only, probably as part of a wider cleanup of StringInfo misusages.
I also spent some time trying to ensure I understand this code
correctly. I was unable to work out what the nsorted field was from
just the comments. I needed to look at the code to figure it out, so I
think the comments for that field need to be improved. A few of the
others were not that clear either. I hope I've improved them in the
attached.
I was also a bit confused at various other comments. e.g:
/*
* Is the value greater than the maxval? If yes, we'll recurse to the
* right side of range array.
*/
I don't see any sort of recursion going on here. All I see are
skipping of values that are out of bounds of the lower bound of the
lowest range, and above the upper bound of the highest range.
I propose to backpatch the attached into v14 tomorrow morning (about
12 hours from now).
The only other weird coding I found was in brin_range_deserialize:
for (i = 0; (i < nvalues) && (!typbyval); i++)
I imagine most compilers would optimize that so that the typbyval
check is done before the first loop and not done on every loop, but I
don't think that coding practice helps the human readers out much. I
left that one alone, for now.
David
diff --git a/src/backend/access/brin/brin_minmax_multi.c
b/src/backend/access/brin/brin_minmax_multi.c
index a581659fe2..99ef97206a 100644
--- a/src/backend/access/brin/brin_minmax_multi.c
+++ b/src/backend/access/brin/brin_minmax_multi.c
@@ -149,13 +149,16 @@ typedef struct MinMaxMultiOptions
* single-point ranges. That is, we have (2*nranges + nvalues) boundary
* values in the array.
*
- * +---------------------------------+-------------------------------+
- * | ranges (sorted pairs of values) | sorted values (single points) |
- * +---------------------------------+-------------------------------+
+ * +-------------------------+----------------------------------+
+ * | ranges (2 * nranges of) | single point values (nvalues of) |
+ * +-------------------------+----------------------------------+
*
* This allows us to quickly add new values, and store outliers without
* making the other ranges very wide.
*
+ * 'nsorted' denotes how many of 'nvalues' in the values[] array are sorted.
+ * When nsorted == nvalues, all single point values are sorted.
+ *
* We never store more than maxvalues values (as set by values_per_range
* reloption). If needed we merge some of the ranges.
*
@@ -173,10 +176,10 @@ typedef struct Ranges
FmgrInfo *cmp;
/* (2*nranges + nvalues) <= maxvalues */
- int nranges; /* number of ranges in
the array (stored) */
- int nsorted; /* number of sorted
values (ranges + points) */
- int nvalues; /* number of values in
the data array (all) */
- int maxvalues; /* maximum number of
values (reloption) */
+ int nranges; /* number of ranges in
the values[] array */
+ int nsorted; /* number of nvalues
which are sorted */
+ int nvalues; /* number of point
values in values[] */
+ int maxvalues; /* number of elements
in the values[] array */
/*
* We simply add the values into a large buffer, without any expensive
@@ -318,102 +321,99 @@ AssertCheckRanges(Ranges *ranges, FmgrInfo *cmpFn, Oid
colloid)
* Check that none of the values are not covered by ranges (both sorted
* and unsorted)
*/
- for (i = 0; i < ranges->nvalues; i++)
+ if (ranges->nranges > 0)
{
- Datum compar;
- int start,
- end;
- Datum minvalue,
- maxvalue;
-
- Datum value = ranges->values[2 * ranges->nranges + i];
-
- if (ranges->nranges == 0)
- break;
-
- minvalue = ranges->values[0];
- maxvalue = ranges->values[2 * ranges->nranges - 1];
-
- /*
- * Is the value smaller than the minval? If yes, we'll recurse
to the
- * left side of range array.
- */
- compar = FunctionCall2Coll(cmpFn, colloid, value, minvalue);
-
- /* smaller than the smallest value in the first range */
- if (DatumGetBool(compar))
- continue;
-
- /*
- * Is the value greater than the maxval? If yes, we'll recurse
to the
- * right side of range array.
- */
- compar = FunctionCall2Coll(cmpFn, colloid, maxvalue, value);
-
- /* larger than the largest value in the last range */
- if (DatumGetBool(compar))
- continue;
-
- start = 0; /* first range */
- end = ranges->nranges - 1; /* last range */
- while (true)
+ for (i = 0; i < ranges->nvalues; i++)
{
- int midpoint = (start + end) / 2;
-
- /* this means we ran out of ranges in the last step */
- if (start > end)
- break;
+ Datum compar;
+ int start,
+ end;
+ Datum minvalue = ranges->values[0];
+ Datum maxvalue = ranges->values[2 *
ranges->nranges - 1];
+ Datum value = ranges->values[2 *
ranges->nranges + i];
- /* copy the min/max values from the ranges */
- minvalue = ranges->values[2 * midpoint];
- maxvalue = ranges->values[2 * midpoint + 1];
+ compar = FunctionCall2Coll(cmpFn, colloid, value,
minvalue);
/*
- * Is the value smaller than the minval? If yes, we'll
recurse to
- * the left side of range array.
+ * If the value is smaller than the lower bound in the
first range
+ * then it cannot possibly be in any of the ranges.
*/
- compar = FunctionCall2Coll(cmpFn, colloid, value,
minvalue);
-
- /* smaller than the smallest value in this range */
if (DatumGetBool(compar))
- {
- end = (midpoint - 1);
continue;
- }
- /*
- * Is the value greater than the minval? If yes, we'll
recurse to
- * the right side of range array.
- */
compar = FunctionCall2Coll(cmpFn, colloid, maxvalue,
value);
- /* larger than the largest value in this range */
+ /*
+ * Likewise, if the value is larger than the upper
bound of the
+ * final range, then it cannot possibly be inside any
of the
+ * ranges.
+ */
if (DatumGetBool(compar))
- {
- start = (midpoint + 1);
continue;
- }
- /* hey, we found a matching range */
- Assert(false);
+ /* bsearch the ranges to see if 'value' fits within any
of them */
+ start = 0; /* first range
*/
+ end = ranges->nranges - 1; /* last range */
+ while (true)
+ {
+ int midpoint = (start +
end) / 2;
+
+ /* this means we ran out of ranges in the last
step */
+ if (start > end)
+ break;
+
+ /* copy the min/max values from the ranges */
+ minvalue = ranges->values[2 * midpoint];
+ maxvalue = ranges->values[2 * midpoint + 1];
+
+ /*
+ * Is the value smaller than the minval? If
yes, we'll recurse
+ * to the left side of range array.
+ */
+ compar = FunctionCall2Coll(cmpFn, colloid,
value, minvalue);
+
+ /* smaller than the smallest value in this
range */
+ if (DatumGetBool(compar))
+ {
+ end = (midpoint - 1);
+ continue;
+ }
+
+ /*
+ * Is the value greater than the minval? If
yes, we'll recurse
+ * to the right side of range array.
+ */
+ compar = FunctionCall2Coll(cmpFn, colloid,
maxvalue, value);
+
+ /* larger than the largest value in this range
*/
+ if (DatumGetBool(compar))
+ {
+ start = (midpoint + 1);
+ continue;
+ }
+
+ /* hey, we found a matching range */
+ Assert(false);
+ }
}
}
- /* and values in the unsorted part must not be in sorted part */
- for (i = ranges->nsorted; i < ranges->nvalues; i++)
+ /* and values in the unsorted part must not be in the sorted part */
+ if (ranges->nsorted > 0)
{
compare_context cxt;
- Datum value = ranges->values[2 * ranges->nranges + i];
-
- if (ranges->nsorted == 0)
- break;
cxt.colloid = ranges->colloid;
cxt.cmpFn = ranges->cmp;
- Assert(bsearch_arg(&value, &ranges->values[2 * ranges->nranges],
- ranges->nsorted,
sizeof(Datum),
- compare_values, (void *)
&cxt) == NULL);
+ for (i = ranges->nsorted; i < ranges->nvalues; i++)
+ {
+ Datum value = ranges->values[2 *
ranges->nranges + i];
+
+ Assert(bsearch_arg(&value, &ranges->values[2 *
ranges->nranges],
+ ranges->nsorted,
sizeof(Datum),
+ compare_values, (void
*) &cxt) == NULL);
+ }
}
#endif
}
@@ -923,8 +923,8 @@ has_matching_range(BrinDesc *bdesc, Oid colloid, Ranges
*ranges,
{
Datum compar;
- Datum minvalue = ranges->values[0];
- Datum maxvalue = ranges->values[2 * ranges->nranges - 1];
+ Datum minvalue;
+ Datum maxvalue;
FmgrInfo *cmpLessFn;
FmgrInfo *cmpGreaterFn;
@@ -936,6 +936,9 @@ has_matching_range(BrinDesc *bdesc, Oid colloid, Ranges
*ranges,
if (ranges->nranges == 0)
return false;
+ minvalue = ranges->values[0];
+ maxvalue = ranges->values[2 * ranges->nranges - 1];
+
/*
* Otherwise, need to compare the new value with boundaries of all the
* ranges. First check if it's less than the absolute minimum, which is