Copilot commented on code in PR #18822: URL: https://github.com/apache/pinot/pull/18822#discussion_r3456927713
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java: ########## @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.aggregator; + +import java.util.Map; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.local.customobject.AvgPair; + + +/// Aggregator for merging serialized [AvgPair] (sum + count) values during segment processing +/// (e.g. MergeRollupTask / RealtimeToOfflineSegmentsTask). +/// +/// The column must store a serialized `AvgPair`, i.e. a `BYTES` column produced by an `AVG` ingestion +/// aggregation (or star-tree). Merging adds the sums and counts so the average stays correct across +/// multiple rollup levels — averaging already-averaged scalars would be wrong. The final `sum / count` +/// division happens at query time in `AvgAggregationFunction`, which already reads the same serialized +/// `AvgPair` format. +public class AvgValueAggregator implements ValueAggregator { + + /// Merges two serialized `AvgPair` values into one. An empty `byte[]` (the default null value for a `BYTES` + /// column) is treated as a missing value, so the other side is returned unchanged; if both are empty, the + /// empty value is returned. + @Override + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { + byte[] bytes1 = (byte[]) value1; + byte[] bytes2 = (byte[]) value2; + + // Treat empty byte arrays (default null value for BYTES columns) as missing values + if (bytes1.length == 0) { + return bytes2; + } else if (bytes2.length == 0) { + return bytes1; + } + + AvgPair first = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes1); + AvgPair second = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes2); + first.apply(second); + return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(first); + } Review Comment: Returning `byte[0]` when both inputs are empty can propagate an invalid serialized value into merged segments. `AvgPair` deserialization (used by `AvgAggregationFunction`) expects 16 bytes and will throw on empty, so it’s safer to emit a valid serialized “empty” `AvgPair` (sum=0,count=0) when both sides are empty, similar to how `DistinctCountCPCSketchAggregator` avoids propagating `byte[0]`. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java: ########## @@ -52,6 +52,8 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega return new MaxValueAggregator(dataType); case SUM: return new SumValueAggregator(dataType); + case AVG: + return new AvgValueAggregator(); Review Comment: `AVG` is overloaded in Pinot (scalar avg vs bytes-backed AvgPair). `ValueAggregatorFactory` currently returns `AvgValueAggregator` for `AVG` regardless of `dataType`, which will throw a `ClassCastException` at runtime if a user configures `aggregationType=avg` on a numeric column. Since enabling `AVG` also makes that configuration pass controller-side validation, it’s worth failing fast here with a clear error message unless the column is `BYTES` (serialized `AvgPair`). ########## pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java: ########## @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.aggregator; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.local.customobject.AvgPair; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class AvgValueAggregatorTest { + + private final Map<String, String> _functionParameters = new HashMap<>(); + private AvgValueAggregator _aggregator; + + @BeforeMethod + public void setUp() { + _aggregator = new AvgValueAggregator(); + } + + private static byte[] serialize(double sum, long count) { + return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(new AvgPair(sum, count)); + } + + private AvgPair aggregate(byte[] value1, byte[] value2) { + byte[] result = (byte[]) _aggregator.aggregate(value1, value2, _functionParameters); + return ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(result); + } + + @Test + public void testAggregateAddsSumAndCount() { + // (sum=30, count=2) avg 15 merged with (sum=20, count=4) avg 5 -> total sum 50, total count 6, avg 8.33... + AvgPair merged = aggregate(serialize(30.0, 2L), serialize(20.0, 4L)); + assertEquals(merged.getSum(), 50.0); + assertEquals(merged.getCount(), 6L); + } + + @Test + public void testTwoLevelRollupPreservesSumAndCount() { + // The average-of-averages trap: a second rollup pass over already-rolled-up rows must keep the + // running sum/count, not re-average the level-1 averages. + // Level-1 group A: 50 rows of value 2 -> (sum=100, count=50), avg 2 + // Level-1 group B: 50 rows of value 4 -> (sum=200, count=50), avg 4 + byte[] level1A = serialize(100.0, 50L); + byte[] level1B = serialize(200.0, 50L); + + AvgPair level2 = aggregate(level1A, level1B); + assertEquals(level2.getSum(), 300.0); + assertEquals(level2.getCount(), 100L); + // Correct overall average is 3.0 (300/100), NOT 3.0 by accident of (2+4)/2 — verify via the pair. + assertEquals(level2.getSum() / level2.getCount(), 3.0); + } + + @Test + public void testAggregateWithBothEmptyBytes() { + byte[] result = (byte[]) _aggregator.aggregate(new byte[0], new byte[0], _functionParameters); + // Both empty (default null value for BYTES columns) -> treated as missing, return empty bytes + assertEquals(result.length, 0); + } Review Comment: If `AvgValueAggregator` is updated to return a valid serialized empty `AvgPair` when both inputs are empty (to avoid propagating `byte[0]` into merged segments), this test should assert on the decoded `(sum,count)` instead of expecting an empty array. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
