xiangfu0 commented on code in PR #18822:
URL: https://github.com/apache/pinot/pull/18822#discussion_r3457047406
##########
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java:
##########
@@ -173,7 +173,7 @@ public static class MergeRollupTask extends MergeTask {
/// with RealtimeToOfflineSegmentsTask, which performs the same rollup
aggregation when configured with rollup
/// merge type.
public static final EnumSet<AggregationFunctionType>
AVAILABLE_CORE_VALUE_AGGREGATORS =
- EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL,
DISTINCTCOUNTTHETASKETCH,
+ EnumSet.of(MIN, MAX, SUM, AVG, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL,
DISTINCTCOUNTTHETASKETCH,
Review Comment:
Addressed in a follow-up commit: bytes-backed rollup aggregations (AVG, the
sketches, TDigest, KLL) are now validated at config time via
`MergeTaskUtils.validateAggregationColumnType` driven by
`ValueAggregatorFactory.isBytesBacked(...)`, called from both the MergeRollup
and RealtimeToOffline generators. An `avg` (or any bytes-backed)
aggregationType on a non-BYTES column is now rejected up front with a clear
message instead of failing with a ClassCastException at task runtime.
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java:
##########
@@ -52,6 +52,8 @@ public static ValueAggregator
getValueAggregator(AggregationFunctionType aggrega
return new MaxValueAggregator(dataType);
case SUM:
return new SumValueAggregator(dataType);
+ case AVG:
+ return new AvgValueAggregator();
Review Comment:
The fail-fast gating is done at config-validation time (in the task
generators), so a bad config is rejected before the task is ever scheduled —
see `MergeTaskUtils.validateAggregationColumnType` driven by
`ValueAggregatorFactory.isBytesBacked(...)`. It is applied uniformly to all
bytes-backed aggregators (sketches/TDigest/KLL/AVG) rather than special-casing
AVG in the factory, which keeps the factory consistent with its sibling entries.
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.Map;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+
+
+/// Aggregator for merging serialized [AvgPair] (sum + count) values during
segment processing
+/// (e.g. MergeRollupTask / RealtimeToOfflineSegmentsTask).
+///
+/// The column must store a serialized `AvgPair`, i.e. a `BYTES` column
produced by an `AVG` ingestion
+/// aggregation (or star-tree). Merging adds the sums and counts so the
average stays correct across
+/// multiple rollup levels — averaging already-averaged scalars would be
wrong. The final `sum / count`
+/// division happens at query time in `AvgAggregationFunction`, which already
reads the same serialized
+/// `AvgPair` format.
+public class AvgValueAggregator implements ValueAggregator {
+
+ /// Merges two serialized `AvgPair` values into one. An empty `byte[]` (the
default null value for a `BYTES`
+ /// column) is treated as a missing value, so the other side is returned
unchanged; if both are empty, the
+ /// empty value is returned.
+ @Override
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
+ byte[] bytes1 = (byte[]) value1;
+ byte[] bytes2 = (byte[]) value2;
+
+ // Treat empty byte arrays (default null value for BYTES columns) as
missing values
+ if (bytes1.length == 0) {
+ return bytes2;
+ } else if (bytes2.length == 0) {
+ return bytes1;
+ }
+
+ AvgPair first = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes1);
+ AvgPair second = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes2);
+ first.apply(second);
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(first);
+ }
Review Comment:
Good catch — fixed. When both inputs are empty the aggregator now returns a
serialized empty `AvgPair` (sum=0, count=0) instead of `byte[0]`, so nothing
unreadable is written into the merged segment. The single-empty case still
returns the other side's already-valid 16-byte value.
##########
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class AvgValueAggregatorTest {
+
+ private final Map<String, String> _functionParameters = new HashMap<>();
+ private AvgValueAggregator _aggregator;
+
+ @BeforeMethod
+ public void setUp() {
+ _aggregator = new AvgValueAggregator();
+ }
+
+ private static byte[] serialize(double sum, long count) {
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(new AvgPair(sum, count));
+ }
+
+ private AvgPair aggregate(byte[] value1, byte[] value2) {
+ byte[] result = (byte[]) _aggregator.aggregate(value1, value2,
_functionParameters);
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(result);
+ }
+
+ @Test
+ public void testAggregateAddsSumAndCount() {
+ // (sum=30, count=2) avg 15 merged with (sum=20, count=4) avg 5 -> total
sum 50, total count 6, avg 8.33...
+ AvgPair merged = aggregate(serialize(30.0, 2L), serialize(20.0, 4L));
+ assertEquals(merged.getSum(), 50.0);
+ assertEquals(merged.getCount(), 6L);
+ }
+
+ @Test
+ public void testTwoLevelRollupPreservesSumAndCount() {
+ // The average-of-averages trap: a second rollup pass over
already-rolled-up rows must keep the
+ // running sum/count, not re-average the level-1 averages.
+ // Level-1 group A: 50 rows of value 2 -> (sum=100, count=50), avg 2
+ // Level-1 group B: 50 rows of value 4 -> (sum=200, count=50), avg 4
+ byte[] level1A = serialize(100.0, 50L);
+ byte[] level1B = serialize(200.0, 50L);
+
+ AvgPair level2 = aggregate(level1A, level1B);
+ assertEquals(level2.getSum(), 300.0);
+ assertEquals(level2.getCount(), 100L);
+ // Correct overall average is 3.0 (300/100), NOT 3.0 by accident of
(2+4)/2 — verify via the pair.
+ assertEquals(level2.getSum() / level2.getCount(), 3.0);
+ }
+
+ @Test
+ public void testAggregateWithBothEmptyBytes() {
+ byte[] result = (byte[]) _aggregator.aggregate(new byte[0], new byte[0],
_functionParameters);
+ // Both empty (default null value for BYTES columns) -> treated as
missing, return empty bytes
+ assertEquals(result.length, 0);
+ }
Review Comment:
Updated — `testAggregateWithBothEmptyBytes` now deserializes the result and
asserts `(sum=0, count=0)` instead of expecting an empty array.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]