This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e44e35d657b Support AVG aggregation in MergeRollupTask and
RealtimeToOfflineSegmentsTask (#18822)
e44e35d657b is described below
commit e44e35d657b2c605b83dec8e2401d0bbb153f75c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Jun 22 23:55:04 2026 -0700
Support AVG aggregation in MergeRollupTask and
RealtimeToOfflineSegmentsTask (#18822)
* Support AVG aggregation in MergeRollupTask and
RealtimeToOfflineSegmentsTask
AVG is computed from a serialized AvgPair (sum + count) stored in a BYTES
column produced by an AVG ingestion aggregation or star-tree. Merging adds
the sums and counts so the average stays correct across multiple rollup
levels; averaging already-averaged scalars would be wrong for groups with
unequal counts. The query-time AvgAggregationFunction already reads the same
serialized AvgPair format, so no query-path change is needed.
- Add AvgValueAggregator to the segment-processing rollup reducer
- Register AVG in the pinot-core processing ValueAggregatorFactory
- Allow AVG in
MinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS
- Add unit + executor tests for both MergeRollup and RealtimeToOffline,
covering unequal counts and two-level rollup
* Validate bytes-backed rollup aggregation columns are declared as BYTES
The merge-rollup and realtime-to-offline task generators previously
validated
the configured aggregationType only against
AVAILABLE_CORE_VALUE_AGGREGATORS,
not the target column's dataType. A bytes-backed aggregation (AVG, the
sketch
types, TDigest, KLL) configured on a non-BYTES column passed config
validation
but failed with a ClassCastException at task runtime.
Add ValueAggregatorFactory.isBytesBacked() (co-located with the aggregator
switch as a single source of truth) and
MergeTaskUtils.validateAggregationColumnType(),
called from both generators, to reject such configs at config time with a
clear
message, uniformly for all bytes-backed types.
* Emit a valid empty AvgPair instead of byte[0] when merging two empty
values
When both inputs to the merge-rollup AvgValueAggregator are empty (the
default
null value for a BYTES column), it previously returned an empty byte[],
which
would be written into the merged segment and then fail AvgPair
deserialization
(expects 16 bytes) in the query-time AVG function. Return a serialized empty
AvgPair (sum=0, count=0) instead, which the query path handles as a no-data
row.
---
.../apache/pinot/core/common/MinionConstants.java | 2 +-
.../processing/aggregator/AvgValueAggregator.java | 62 ++++
.../aggregator/ValueAggregatorFactory.java | 35 ++-
.../aggregator/AvgValueAggregatorTest.java | 114 +++++++
.../pinot/plugin/minion/tasks/MergeTaskUtils.java | 19 ++
.../mergerollup/MergeRollupTaskGenerator.java | 1 +
.../RealtimeToOfflineSegmentsTaskGenerator.java | 1 +
.../MergeRollupAvgTaskExecutorTest.java | 330 +++++++++++++++++++++
.../mergerollup/MergeRollupTaskGeneratorTest.java | 40 ++-
.../RealtimeToOfflineSegmentsTaskExecutorTest.java | 94 ++++++
...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 46 ++-
11 files changed, 738 insertions(+), 6 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 4ae4f8baf1b..976216d06a9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -173,7 +173,7 @@ public class MinionConstants {
/// with RealtimeToOfflineSegmentsTask, which performs the same rollup
aggregation when configured with rollup
/// merge type.
public static final EnumSet<AggregationFunctionType>
AVAILABLE_CORE_VALUE_AGGREGATORS =
- EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL,
DISTINCTCOUNTTHETASKETCH,
+ EnumSet.of(MIN, MAX, SUM, AVG, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL,
DISTINCTCOUNTTHETASKETCH,
DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH,
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH,
DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH,
DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java
new file mode 100644
index 00000000000..762080f6c6f
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.Map;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+
+
+/// Aggregator for merging serialized [AvgPair] (sum + count) values during
segment processing
+/// (e.g. MergeRollupTask / RealtimeToOfflineSegmentsTask).
+///
+/// The column must store a serialized `AvgPair`, i.e. a `BYTES` column
produced by an `AVG` ingestion
+/// aggregation (or star-tree). Merging adds the sums and counts so the
average stays correct across
+/// multiple rollup levels — averaging already-averaged scalars would be
wrong. The final `sum / count`
+/// division happens at query time in `AvgAggregationFunction`, which already
reads the same serialized
+/// `AvgPair` format.
+public class AvgValueAggregator implements ValueAggregator {
+
+ /// Merges two serialized `AvgPair` values into one. An empty `byte[]` (the
default null value for a `BYTES`
+ /// column) is treated as a missing value, so the other side is returned
unchanged; if both are empty, a serialized
+ /// empty `AvgPair` (sum=0, count=0) is returned rather than an empty
`byte[]`, which `AvgPair` deserialization
+ /// (16 bytes) cannot read.
+ @Override
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
+ byte[] bytes1 = (byte[]) value1;
+ byte[] bytes2 = (byte[]) value2;
+
+ // Treat empty byte arrays (default null value for BYTES columns) as
missing values. When both sides are empty,
+ // emit a valid serialized empty AvgPair rather than propagating an empty
byte[] that cannot be deserialized.
+ if (bytes1.length == 0 && bytes2.length == 0) {
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(new AvgPair());
+ }
+ if (bytes1.length == 0) {
+ return bytes2;
+ }
+ if (bytes2.length == 0) {
+ return bytes1;
+ }
+
+ AvgPair first = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes1);
+ AvgPair second = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes2);
+ first.apply(second);
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(first);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 36dbc1e7ff2..1d5aa5e3978 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -39,11 +39,42 @@ public class ValueAggregatorFactory {
|| aggregationType == AggregationFunctionType.LASTWITHTIME;
}
+ /// Returns `true` if the rollup ValueAggregator for the given type stores
its aggregated value as a serialized
+ /// object (sketch, AvgPair, TDigest, ...) and therefore reads the metric
column value as a `byte[]`, i.e. the
+ /// column must be a `BYTES` column. Numeric (MIN/MAX/SUM) and time-ordered
(FIRSTWITHTIME/LASTWITHTIME) aggregators
+ /// keep the column's native type. Keep this in sync with
[#getValueAggregator]: a type is bytes-backed iff its
+ /// aggregator deserializes the column value as `byte[]`.
+ public static boolean isBytesBacked(AggregationFunctionType aggregationType)
{
+ switch (aggregationType) {
+ case AVG:
+ case DISTINCTCOUNTHLL:
+ case DISTINCTCOUNTRAWHLL:
+ case DISTINCTCOUNTTHETASKETCH:
+ case DISTINCTCOUNTRAWTHETASKETCH:
+ case DISTINCTCOUNTTUPLESKETCH:
+ case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+ case SUMVALUESINTEGERSUMTUPLESKETCH:
+ case AVGVALUEINTEGERSUMTUPLESKETCH:
+ case DISTINCTCOUNTCPCSKETCH:
+ case DISTINCTCOUNTRAWCPCSKETCH:
+ case DISTINCTCOUNTULL:
+ case DISTINCTCOUNTRAWULL:
+ case PERCENTILEKLL:
+ case PERCENTILERAWKLL:
+ case PERCENTILETDIGEST:
+ case PERCENTILERAWTDIGEST:
+ return true;
+ default:
+ return false;
+ }
+ }
+
/// Constructs a ValueAggregator from the given aggregation type.
///
/// When adding entries to this please add them to the Set named
AVAILABLE_CORE_VALUE_AGGREGATORS in
/// org.apache.pinot.core.common.MinionConstants.MergeRollupTask so that
they pass the task config validation of the
- /// merge tasks (MergeRollupTask, RealtimeToOfflineSegmentsTask)
+ /// merge tasks (MergeRollupTask, RealtimeToOfflineSegmentsTask), and update
[#isBytesBacked] if the new aggregator
+ /// reads/writes the column value as `byte[]`.
public static ValueAggregator getValueAggregator(AggregationFunctionType
aggregationType, DataType dataType) {
switch (aggregationType) {
case MIN:
@@ -52,6 +83,8 @@ public class ValueAggregatorFactory {
return new MaxValueAggregator(dataType);
case SUM:
return new SumValueAggregator(dataType);
+ case AVG:
+ return new AvgValueAggregator();
case DISTINCTCOUNTHLL:
case DISTINCTCOUNTRAWHLL:
return new DistinctCountHLLAggregator();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java
new file mode 100644
index 00000000000..b72e277493c
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class AvgValueAggregatorTest {
+
+ private final Map<String, String> _functionParameters = new HashMap<>();
+ private AvgValueAggregator _aggregator;
+
+ @BeforeMethod
+ public void setUp() {
+ _aggregator = new AvgValueAggregator();
+ }
+
+ private static byte[] serialize(double sum, long count) {
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(new AvgPair(sum, count));
+ }
+
+ private AvgPair aggregate(byte[] value1, byte[] value2) {
+ byte[] result = (byte[]) _aggregator.aggregate(value1, value2,
_functionParameters);
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(result);
+ }
+
+ @Test
+ public void testAggregateAddsSumAndCount() {
+ // (sum=30, count=2) avg 15 merged with (sum=20, count=4) avg 5 -> total
sum 50, total count 6, avg 8.33...
+ AvgPair merged = aggregate(serialize(30.0, 2L), serialize(20.0, 4L));
+ assertEquals(merged.getSum(), 50.0);
+ assertEquals(merged.getCount(), 6L);
+ }
+
+ @Test
+ public void testTwoLevelRollupPreservesSumAndCount() {
+ // The average-of-averages trap: a second rollup pass over
already-rolled-up rows must keep the
+ // running sum/count, not re-average the level-1 averages.
+ // Level-1 group A: 50 rows of value 2 -> (sum=100, count=50), avg 2
+ // Level-1 group B: 50 rows of value 4 -> (sum=200, count=50), avg 4
+ byte[] level1A = serialize(100.0, 50L);
+ byte[] level1B = serialize(200.0, 50L);
+
+ AvgPair level2 = aggregate(level1A, level1B);
+ assertEquals(level2.getSum(), 300.0);
+ assertEquals(level2.getCount(), 100L);
+ // Correct overall average is 3.0 (300/100), NOT 3.0 by accident of
(2+4)/2 — verify via the pair.
+ assertEquals(level2.getSum() / level2.getCount(), 3.0);
+ }
+
+ @Test
+ public void testAggregateWithBothEmptyBytes() {
+ byte[] result = (byte[]) _aggregator.aggregate(new byte[0], new byte[0],
_functionParameters);
+ // Both empty (default null value for BYTES columns) -> a valid serialized
empty AvgPair (sum=0, count=0), not an
+ // empty byte[] that AvgPair deserialization (and the query-time AVG
function) cannot read.
+ AvgPair merged = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(result);
+ assertEquals(merged.getSum(), 0.0);
+ assertEquals(merged.getCount(), 0L);
+ }
+
+ @Test
+ public void testAggregateWithFirstEmptyBytes() {
+ byte[] value2 = serialize(42.0, 7L);
+ byte[] result = (byte[]) _aggregator.aggregate(new byte[0], value2,
_functionParameters);
+ // Should return the non-empty side as-is
+ assertEquals(result, value2);
+ }
+
+ @Test
+ public void testAggregateWithSecondEmptyBytes() {
+ byte[] value1 = serialize(42.0, 7L);
+ byte[] result = (byte[]) _aggregator.aggregate(value1, new byte[0],
_functionParameters);
+ // Should return the non-empty side as-is
+ assertEquals(result, value1);
+ }
+
+ @Test(expectedExceptions = java.nio.BufferUnderflowException.class)
+ public void testAggregateRejectsMalformedBytes() {
+ // A non-empty but malformed (too short) AvgPair buffer must fail loudly
rather than silently corrupt.
+ _aggregator.aggregate(serialize(1.0, 1L), new byte[]{1, 2, 3},
_functionParameters);
+ }
+
+ @Test
+ public void testFactoryReturnsAvgAggregator() {
+ ValueAggregator aggregator =
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.AVG,
DataType.BYTES);
+ assertTrue(aggregator instanceof AvgValueAggregator);
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 13b4067761a..220269db7bd 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -166,6 +166,25 @@ public class MergeTaskUtils {
aggregationType, column, timeColumn);
}
+ /// Validates that a column configured with a bytes-backed rollup
aggregation (sketches, AvgPair, TDigest, ...) is
+ /// declared as a `BYTES` column in the schema. These aggregators read the
stored column value as a serialized
+ /// object, so a non-BYTES column would fail with a ClassCastException at
task runtime. No-op for other (numeric or
+ /// time-ordered) aggregation types. The given aggregation type must be
parseable (see
+ /// [AggregationFunctionType#getAggregationFunctionType(String)]).
+ public static void validateAggregationColumnType(Schema schema, String
column, String aggregationType) {
+ AggregationFunctionType aggregationFunctionType =
AggregationFunctionType.getAggregationFunctionType(
+ aggregationType);
+ if (!ValueAggregatorFactory.isBytesBacked(aggregationFunctionType)) {
+ return;
+ }
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ Preconditions.checkState(fieldSpec != null,
+ "Aggregation type: %s on column: %s requires the column to exist in
schema!", aggregationType, column);
+ Preconditions.checkState(fieldSpec.getDataType() ==
FieldSpec.DataType.BYTES,
+ "Aggregation type: %s on column: %s requires the column to be of type
BYTES in schema, but found: %s",
+ aggregationType, column, fieldSpec.getDataType());
+ }
+
/**
* Returns the segment config based on the task config.
* TODO - Ensure all tasks that build SegmentConfig use this method so that
all appropriate configs are set.
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 47c92a9f806..839d5823666 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -528,6 +528,7 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
"Invalid aggregation type: " + entry.getValue() + " for column:
" + column, e);
}
MergeTaskUtils.validateOrderSensitiveAggregation(tableConfig, schema,
column, entry.getValue());
+ MergeTaskUtils.validateAggregationColumnType(schema, column,
entry.getValue());
}
}
// check no mis-configured aggregation function parameters
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index c6631dccc3c..2240974cec1 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -354,6 +354,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
throw new IllegalStateException(err, e);
}
MergeTaskUtils.validateOrderSensitiveAggregation(tableConfig, schema,
column, entry.getValue());
+ MergeTaskUtils.validateAggregationColumnType(schema, column,
entry.getValue());
}
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java
new file mode 100644
index 00000000000..0443e95eb85
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.config.SchemaSerDeUtils;
+import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskTestUtils;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests MergeRollup task executor with AVG aggregation on a {@code BYTES}
column holding serialized
+ * {@link AvgPair} (sum + count) values.
+ * <p>
+ * AVG is correct across rollups only because the (sum, count) pair is
preserved and merged additively;
+ * averaging already-averaged scalars would be wrong whenever groups have
unequal counts. The tests below
+ * deliberately use unequal counts so an average-of-averages implementation
would fail them.
+ */
+public class MergeRollupAvgTaskExecutorTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"MergeRollupAvgTaskExecutorTest");
+ private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR,
"originalSegment");
+ private static final String TABLE_NAME = "avg_table";
+ private static final String DIMENSION_COL = "groupKey";
+ private static final String AVG_COL = "avg_metric";
+
+ private static final String GROUP_1 = "group1";
+ private static final String GROUP_2 = "group2";
+ private static final String GROUP_3 = "group3";
+
+ private TableConfig _tableConfig;
+ private Schema _schema;
+ private int _workingDirCounter = 0;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ _tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ _schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(DIMENSION_COL, FieldSpec.DataType.STRING)
+ .addMetric(AVG_COL, FieldSpec.DataType.BYTES)
+ .build();
+
+ MinionContext minionContext = MinionContext.getInstance();
+ //noinspection unchecked
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore =
Mockito.mock(ZkHelixPropertyStore.class);
+ Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/" + TABLE_NAME +
"_OFFLINE", null, AccessOption.PERSISTENT))
+ .thenReturn(TableConfigSerDeUtils.toZNRecord(_tableConfig));
+ Mockito.when(helixPropertyStore.get("/SCHEMAS/" + TABLE_NAME, null,
AccessOption.PERSISTENT))
+ .thenReturn(SchemaSerDeUtils.toZNRecord(_schema));
+ minionContext.setHelixPropertyStore(helixPropertyStore);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+
+ /**
+ * Three distinct dimension groups across four segments are merged
independently, each preserving the
+ * total sum and count (and therefore the correct average).
+ */
+ @Test
+ public void testMultipleDimensionGroupsIndependentMerge()
+ throws Exception {
+ List<File> segmentDirs = buildSegments(buildStandardSegmentData());
+ List<SegmentConversionResult> results = runExecutor(segmentDirs, null);
+
+ Assert.assertEquals(results.size(), 1);
+ File mergedSegment = results.get(0).getFile();
+
+ SegmentMetadataImpl metadata = new SegmentMetadataImpl(mergedSegment);
+ Assert.assertEquals(metadata.getTotalDocs(), 3);
+
+ Map<String, AvgPair> avgPairs = readMergedAvgPairs(mergedSegment);
+ Assert.assertEquals(avgPairs.size(), 3);
+
+ // group1: values [1..300] -> sum 45150, count 300, avg 150.5
+ assertAvg(avgPairs.get(GROUP_1), 45150.0, 300L, 150.5);
+ // group2: values [500..799] -> sum 194850, count 300, avg 649.5
+ assertAvg(avgPairs.get(GROUP_2), 194850.0, 300L, 649.5);
+ // group3: values [1..200] -> sum 20100, count 200, avg 100.5
+ assertAvg(avgPairs.get(GROUP_3), 20100.0, 200L, 100.5);
+ }
+
+ /**
+ * The same dimension key appearing in multiple segments has all its
AvgPairs merged.
+ */
+ @Test
+ public void testCrossSegmentMerging()
+ throws Exception {
+ List<File> segmentDirs = buildSegments(buildStandardSegmentData());
+ Map<String, AvgPair> avgPairs =
readMergedAvgPairs(runExecutor(segmentDirs, null).get(0).getFile());
+
+ // group1 came from 3 segments (100 + 100 + 100 = 300 values)
+ Assert.assertEquals(avgPairs.get(GROUP_1).getCount(), 300L);
+ // group2 came from 3 segments (100 + 100 + 100 = 300 values)
+ Assert.assertEquals(avgPairs.get(GROUP_2).getCount(), 300L);
+ // group3 came from 3 segments (50 + 100 + 50 = 200 values)
+ Assert.assertEquals(avgPairs.get(GROUP_3).getCount(), 200L);
+ }
+
+ /**
+ * Groups with unequal counts must merge by adding sums and counts, not by
averaging the per-segment
+ * averages. With counts 10 and 100, an average-of-averages implementation
would yield 77.5 instead of
+ * the correct 136.409...
+ */
+ @Test
+ public void testUnequalCountsAvoidsAverageOfAverages()
+ throws Exception {
+ List<List<GenericRow>> segments = new ArrayList<>();
+ // 10 values [1..10] -> sum 55, avg 5.5
+ segments.add(List.of(makeRow(GROUP_1, createAvgPair(1, 11))));
+ // 100 values [100..199] -> sum 14950, avg 149.5
+ segments.add(List.of(makeRow(GROUP_1, createAvgPair(100, 200))));
+
+ Map<String, AvgPair> avgPairs =
readMergedAvgPairs(runExecutor(buildSegments(segments), null).get(0).getFile());
+ AvgPair merged = avgPairs.get(GROUP_1);
+ Assert.assertEquals(merged.getSum(), 15005.0);
+ Assert.assertEquals(merged.getCount(), 110L);
+ Assert.assertEquals(merged.getSum() / merged.getCount(), 15005.0 / 110.0,
1e-9);
+ }
+
+ /**
+ * A second rollup pass over already-rolled-up (pre-aggregated) AvgPair
segments still yields the correct
+ * overall average. Uses unequal counts so average-of-averages would diverge
from the true value.
+ * <p>
+ * Level 1 merges [1..10] (count 10) and [11..30] (count 20) -> sum 465,
count 30.
+ * Level 2 merges that with [31..100] (count 70) -> sum 5050, count 100, avg
50.5 (the true avg of [1..100]).
+ */
+ @Test
+ public void testTwoLevelRollupPreservesAverage()
+ throws Exception {
+ List<List<GenericRow>> level1Input = new ArrayList<>();
+ level1Input.add(List.of(makeRow(GROUP_1, createAvgPair(1, 11)))); //
[1..10]
+ level1Input.add(List.of(makeRow(GROUP_1, createAvgPair(11, 31)))); //
[11..30]
+ File level1Merged = runExecutor(buildSegments(level1Input),
null).get(0).getFile();
+
+ // Verify the intermediate (level-1) AvgPair is preserved as (sum, count),
not collapsed to an average.
+ AvgPair level1 = readMergedAvgPairs(level1Merged).get(GROUP_1);
+ Assert.assertEquals(level1.getSum(), 465.0);
+ Assert.assertEquals(level1.getCount(), 30L);
+
+ List<File> level2Input = new ArrayList<>();
+ level2Input.add(level1Merged);
+ level2Input.addAll(buildSegments(List.of(List.of(makeRow(GROUP_1,
createAvgPair(31, 101)))))); // [31..100]
+
+ AvgPair finalPair = readMergedAvgPairs(runExecutor(level2Input,
null).get(0).getFile()).get(GROUP_1);
+ assertAvg(finalPair, 5050.0, 100L, 50.5);
+ }
+
+ /**
+ * An empty AvgPair byte array (the default null value for BYTES columns) is
treated as missing; the merge
+ * reflects only the non-empty input.
+ */
+ @Test
+ public void testEmptyAvgPairHandling()
+ throws Exception {
+ List<List<GenericRow>> segments = new ArrayList<>();
+ GenericRow emptyRow = new GenericRow();
+ emptyRow.putValue(DIMENSION_COL, GROUP_1);
+ emptyRow.putValue(AVG_COL, new byte[0]);
+ segments.add(List.of(emptyRow));
+ segments.add(List.of(makeRow(GROUP_1, createAvgPair(1, 101)))); //
[1..100] -> sum 5050, count 100
+
+ List<File> segmentDirs = buildSegments(segments);
+ List<SegmentConversionResult> results = runExecutor(segmentDirs, null);
+
+ SegmentMetadataImpl metadata = new
SegmentMetadataImpl(results.get(0).getFile());
+ Assert.assertEquals(metadata.getTotalDocs(), 1);
+
+ assertAvg(readMergedAvgPairs(results.get(0).getFile()).get(GROUP_1),
5050.0, 100L, 50.5);
+ }
+
+ /**
+ * After rollup, the total doc count must equal the number of distinct
dimension keys.
+ */
+ @Test
+ public void testSegmentDocCountEqualsDistinctKeys()
+ throws Exception {
+ List<File> segmentDirs = buildSegments(buildStandardSegmentData());
+ List<SegmentConversionResult> results = runExecutor(segmentDirs, null);
+
+ SegmentMetadataImpl metadata = new
SegmentMetadataImpl(results.get(0).getFile());
+ Assert.assertEquals(metadata.getTotalDocs(), 3,
+ "Rollup doc count should equal the number of distinct dimension keys");
+ }
+
+ private static void assertAvg(AvgPair avgPair, double expectedSum, long
expectedCount, double expectedAvg) {
+ Assert.assertNotNull(avgPair);
+ Assert.assertEquals(avgPair.getSum(), expectedSum);
+ Assert.assertEquals(avgPair.getCount(), expectedCount);
+ Assert.assertEquals(avgPair.getSum() / avgPair.getCount(), expectedAvg,
1e-9);
+ }
+
+ /**
+ * Creates an AvgPair accumulating the integer values in {@code [start,
end)}.
+ */
+ private static AvgPair createAvgPair(int start, int end) {
+ AvgPair avgPair = new AvgPair();
+ for (int v = start; v < end; v++) {
+ avgPair.apply(v);
+ }
+ return avgPair;
+ }
+
+ private static GenericRow makeRow(String dimensionValue, AvgPair avgPair) {
+ GenericRow row = new GenericRow();
+ row.putValue(DIMENSION_COL, dimensionValue);
+ row.putValue(AVG_COL, ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(avgPair));
+ return row;
+ }
+
+ private List<File> buildSegments(List<List<GenericRow>> segmentRows)
+ throws Exception {
+ List<File> segmentDirs = new ArrayList<>();
+ for (int i = 0; i < segmentRows.size(); i++) {
+ String segmentName = TABLE_NAME + "_seg" + i + "_" + System.nanoTime();
+ RecordReader recordReader = new
GenericRowRecordReader(segmentRows.get(i));
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig,
_schema);
+ config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(segmentName);
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, recordReader);
+ driver.build();
+ segmentDirs.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
+ }
+ return segmentDirs;
+ }
+
+ private List<SegmentConversionResult> runExecutor(List<File> segmentDirs,
Map<String, String> extraConfigs)
+ throws Exception {
+ File workingDir = new File(TEMP_DIR, "workingDir_" +
(_workingDirCounter++));
+ MergeRollupTaskExecutor executor = new MergeRollupTaskExecutor(new
MinionConf());
+
executor.setMinionEventObserver(MinionTaskTestUtils.getMinionProgressObserver());
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME + "_OFFLINE");
+ configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
+ configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup");
+ configs.put(AVG_COL +
MinionConstants.MergeTask.AGGREGATION_TYPE_KEY_SUFFIX, "avg");
+ if (extraConfigs != null) {
+ configs.putAll(extraConfigs);
+ }
+
+ PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
+ return executor.convert(pinotTaskConfig, segmentDirs, workingDir);
+ }
+
+ private Map<String, AvgPair> readMergedAvgPairs(File mergedSegmentDir)
+ throws Exception {
+ Map<String, AvgPair> result = new HashMap<>();
+ PinotSegmentRecordReader reader = new PinotSegmentRecordReader();
+ reader.init(mergedSegmentDir, null, null, true);
+ while (reader.hasNext()) {
+ GenericRow row = reader.next();
+ String key = (String) row.getValue(DIMENSION_COL);
+ byte[] bytes = (byte[]) row.getValue(AVG_COL);
+ result.put(key, ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes));
+ }
+ reader.close();
+ return result;
+ }
+
+ private static List<List<GenericRow>> buildStandardSegmentData() {
+ List<List<GenericRow>> segments = new ArrayList<>();
+
+ // Segment 0: group1=[1..100], group2=[500..599]
+ segments.add(List.of(makeRow(GROUP_1, createAvgPair(1, 101)),
makeRow(GROUP_2, createAvgPair(500, 600))));
+ // Segment 1: group1=[101..200], group2=[600..699], group3=[1..50]
+ segments.add(List.of(makeRow(GROUP_1, createAvgPair(101, 201)),
makeRow(GROUP_2, createAvgPair(600, 700)),
+ makeRow(GROUP_3, createAvgPair(1, 51))));
+ // Segment 2: group1=[201..300], group3=[51..150]
+ segments.add(List.of(makeRow(GROUP_1, createAvgPair(201, 301)),
makeRow(GROUP_3, createAvgPair(51, 151))));
+ // Segment 3: group2=[700..799], group3=[151..200]
+ segments.add(List.of(makeRow(GROUP_2, createAvgPair(700, 800)),
makeRow(GROUP_3, createAvgPair(151, 201))));
+
+ return segments;
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 2be88125942..a07caca15ca 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -182,7 +182,7 @@ public class MergeRollupTaskGeneratorTest {
() -> taskGenerator.validateTaskConfigs(tableConfigWithTimeColumn,
schema, taskConfig));
// Parseable aggregation type without an available value aggregator should
fail the validation
- taskConfig.put("c.aggregationType", "avg");
+ taskConfig.put("c.aggregationType", "distinctCount");
assertThrows(IllegalStateException.class,
() -> taskGenerator.validateTaskConfigs(tableConfigWithTimeColumn,
schema, taskConfig));
taskConfig.put("c.aggregationType", "lastWithTime");
@@ -216,6 +216,44 @@ public class MergeRollupTaskGeneratorTest {
() -> taskGenerator.validateTaskConfigs(tableConfigWithTimeColumn,
schema, missingColumnConfig));
}
+ @Test
+ public void testBytesBackedAggregationColumnTypeValidation() {
+ MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
+ Schema schema = new Schema();
+ schema.addField(new MetricFieldSpec("bytesCol", FieldSpec.DataType.BYTES));
+ schema.addField(new MetricFieldSpec("longCol", FieldSpec.DataType.LONG));
+ schema.addField(new DimensionFieldSpec("d", FieldSpec.DataType.STRING,
true));
+ schema.addField(new DateTimeFieldSpec(TIME_COLUMN_NAME,
FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
+ "1:MILLISECONDS"));
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME).build();
+
+ // Bytes-backed aggregations on a BYTES column are valid
+ for (String aggregationType : new String[]{"avg", "percentileTDigest",
"distinctCountHLL"}) {
+ Map<String, String> validConfig = new HashMap<>();
+ validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY,
"daily");
+ validConfig.put("bytesCol.aggregationType", aggregationType);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+ }
+
+ // The same bytes-backed aggregations on a non-BYTES (LONG) column must
fail at config time
+ for (String aggregationType : new String[]{"avg", "percentileTDigest",
"distinctCountHLL"}) {
+ Map<String, String> invalidConfig = new HashMap<>();
+ invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY,
"daily");
+ invalidConfig.put("longCol.aggregationType", aggregationType);
+ assertThrows(IllegalStateException.class,
+ () -> taskGenerator.validateTaskConfigs(tableConfig, schema,
invalidConfig));
+ }
+
+ // Non-bytes-backed aggregations on a numeric column remain valid
+ for (String aggregationType : new String[]{"sum", "max"}) {
+ Map<String, String> validConfig = new HashMap<>();
+ validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY,
"daily");
+ validConfig.put("longCol.aggregationType", aggregationType);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+ }
+ }
+
@Test
public void testInvalidAggregationFunctionFieldName() {
MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 2f40da4782a..06357e07345 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -32,12 +32,15 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.utils.config.SchemaSerDeUtils;
import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.plugin.minion.tasks.MinionTaskTestUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.customobject.AvgPair;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -78,14 +81,17 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
private static final String TABLE_NAME_WITH_SORTED_COL =
"testTableWithSortedCol_OFFLINE";
private static final String TABLE_NAME_EPOCH_HOURS =
"testTableEpochHours_OFFLINE";
private static final String TABLE_NAME_SDF = "testTableSDF_OFFLINE";
+ private static final String TABLE_NAME_AVG = "testTableAvg_OFFLINE";
private static final String D1 = "d1";
private static final String M1 = "m1";
+ private static final String M_AVG = "mavg";
private static final String T = "t";
private static final String T_TRX = "t_trx";
private List<File> _segmentIndexDirList;
private List<File> _segmentIndexDirListEpochHours;
private List<File> _segmentIndexDirListSDF;
+ private List<File> _segmentIndexDirListAvg;
@BeforeClass
public void setUp()
@@ -125,6 +131,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
.addMetric(M1, FieldSpec.DataType.INT)
.addDateTime(T_TRX, FieldSpec.DataType.INT,
"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS").build();
+ TableConfig tableConfigAvg =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_AVG).setTimeColumnName(T).build();
+ Schema schemaAvg =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME_AVG).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
+ .addMetric(M_AVG, FieldSpec.DataType.BYTES)
+ .addDateTime(T, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:MILLISECONDS").build();
List<String> d1 = Lists.newArrayList("foo", "bar", "foo", "foo", "bar");
List<List<GenericRow>> rows = new ArrayList<>(NUM_SEGMENTS);
@@ -191,6 +203,29 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
_segmentIndexDirListSDF.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
}
+ // create test segments with a BYTES column holding serialized AvgPair
(sum + count) for AVG rollup.
+ // Two segments share dimension key "a" within the same day bucket, with
unequal counts (10 and 100) so a
+ // correct merge must add sums and counts rather than average the
per-segment averages.
+ _segmentIndexDirListAvg = new ArrayList<>();
+ int[][] avgRanges = {{1, 11}, {100, 200}};
+ for (int i = 0; i < avgRanges.length; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(D1, "a");
+ row.putValue(M_AVG, createAvgPairBytes(avgRanges[i][0],
avgRanges[i][1]));
+ row.putValue(T, 1600473600000L);
+ String segmentName = "segmentAvg_" + i;
+ RecordReader recordReader = new
GenericRowRecordReader(Collections.singletonList(row));
+ SegmentGeneratorConfig config = new
SegmentGeneratorConfig(tableConfigAvg, schemaAvg);
+ config.setInstanceType(InstanceType.MINION);
+ config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
+ config.setTableName(TABLE_NAME_AVG);
+ config.setSegmentName(segmentName);
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, recordReader);
+ driver.build();
+ _segmentIndexDirListAvg.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
+ }
+
MinionContext minionContext = MinionContext.getInstance();
@SuppressWarnings("unchecked")
ZkHelixPropertyStore<ZNRecord> helixPropertyStore =
Mockito.mock(ZkHelixPropertyStore.class);
@@ -215,6 +250,10 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
.thenReturn(SchemaSerDeUtils.toZNRecord(schemaEpochHours));
Mockito.when(helixPropertyStore.get("/SCHEMAS/testTableSDF", null,
AccessOption.PERSISTENT))
.thenReturn(SchemaSerDeUtils.toZNRecord(schemaSDF));
+ Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/" + TABLE_NAME_AVG,
null, AccessOption.PERSISTENT))
+ .thenReturn(TableConfigSerDeUtils.toZNRecord(tableConfigAvg));
+ Mockito.when(helixPropertyStore.get("/SCHEMAS/testTableAvg", null,
AccessOption.PERSISTENT))
+ .thenReturn(SchemaSerDeUtils.toZNRecord(schemaAvg));
minionContext.setHelixPropertyStore(helixPropertyStore);
}
@@ -339,6 +378,61 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
assertEquals(columnMetadataForM1.getMaxValue(), 3);
}
+ @Test
+ public void testRollupWithAvgAggregation()
+ throws Exception {
+ FileUtils.deleteQuietly(WORKING_DIR);
+
+ RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
+
realtimeToOfflineSegmentsTaskExecutor.setMinionEventObserver(MinionTaskTestUtils.getMinionProgressObserver());
+ Map<String, String> configs = new HashMap<>();
+ configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_AVG);
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
"1d");
+ configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
"rollup");
+ configs.put(M_AVG +
MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX,
"avg");
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+
+ List<SegmentConversionResult> conversionResults =
+ realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirListAvg, WORKING_DIR);
+
+ assertEquals(conversionResults.size(), 1);
+ File resultingSegment = conversionResults.get(0).getFile();
+ SegmentMetadataImpl segmentMetadata = new
SegmentMetadataImpl(resultingSegment);
+ // Single (d1=a, day-bucket) group -> the two AvgPairs (counts 10 and 100)
merge into one row
+ assertEquals(segmentMetadata.getTotalDocs(), 1);
+
+ AvgPair merged = readAvgPair(resultingSegment);
+ // Sum and count are added (not averaged): 55 + 14950 = 15005 over 10 +
100 = 110.
+ // Average-of-averages would wrongly give (5.5 + 149.5) / 2 = 77.5 instead
of ~136.41.
+ assertEquals(merged.getSum(), 15005.0);
+ assertEquals(merged.getCount(), 110L);
+ }
+
+ private static byte[] createAvgPairBytes(int start, int end) {
+ AvgPair avgPair = new AvgPair();
+ for (int v = start; v < end; v++) {
+ avgPair.apply(v);
+ }
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(avgPair);
+ }
+
+ private static AvgPair readAvgPair(File segmentDir)
+ throws Exception {
+ PinotSegmentRecordReader reader = new PinotSegmentRecordReader();
+ reader.init(segmentDir, null, null, true);
+ try {
+ Assert.assertTrue(reader.hasNext());
+ GenericRow row = reader.next();
+ return ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize((byte[])
row.getValue(M_AVG));
+ } finally {
+ reader.close();
+ }
+ }
+
@Test
public void testTablePartitioning()
throws Exception {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 1943fdbb9c9..fee0114cf9d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -524,6 +524,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addMetric("bytesCol", FieldSpec.DataType.BYTES)
.addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
@@ -626,16 +627,17 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
}
- // valid agg
+ // valid agg: distinctCountHLL is bytes-backed, so it requires a BYTES
column
HashMap<String, String> validAggConfig = new
HashMap<>(realtimeToOfflineTaskConfig);
- validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
+ validAggConfig.put("bytesCol.aggregationType", "distinctCountHLL");
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
new TableTaskConfig(
Map.of("RealtimeToOfflineSegmentsTask", validAggConfig,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
taskGenerator.validateTaskConfigs(tableConfig, schema, validAggConfig);
- // valid agg
+ // valid agg: distinctCountHLLPlus is allow-listed but has no merge value
aggregator, so it is not bytes-backed
+ // and is not subject to the BYTES column requirement
HashMap<String, String> validAgg2Config = new
HashMap<>(realtimeToOfflineTaskConfig);
validAgg2Config.put("myCol.aggregationType", "distinctCountHLLPlus");
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
@@ -694,6 +696,44 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
}
}
+ @Test
+ public void testBytesBackedAggregationColumnTypeValidation() {
+ RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new
RealtimeToOfflineSegmentsTaskGenerator();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addSingleValueDimension("d", FieldSpec.DataType.STRING)
+ .addMetric("bytesCol", FieldSpec.DataType.BYTES)
+ .addMetric("longCol", FieldSpec.DataType.LONG)
+ .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .build();
+
+ // Bytes-backed aggregations on a BYTES column are valid
+ for (String aggregationType : new String[]{"avg", "percentileTDigest",
"distinctCountHLL"}) {
+ Map<String, String> validConfig = new HashMap<>();
+ validConfig.put("mergeType", "rollup");
+ validConfig.put("bytesCol.aggregationType", aggregationType);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+ }
+
+ // The same bytes-backed aggregations on a non-BYTES (LONG) column must
fail at config time
+ for (String aggregationType : new String[]{"avg", "percentileTDigest",
"distinctCountHLL"}) {
+ Map<String, String> invalidConfig = new HashMap<>();
+ invalidConfig.put("mergeType", "rollup");
+ invalidConfig.put("longCol.aggregationType", aggregationType);
+ Assert.assertThrows(IllegalStateException.class,
+ () -> taskGenerator.validateTaskConfigs(tableConfig, schema,
invalidConfig));
+ }
+
+ // Non-bytes-backed aggregations on a numeric column remain valid
+ for (String aggregationType : new String[]{"sum", "max"}) {
+ Map<String, String> validConfig = new HashMap<>();
+ validConfig.put("mergeType", "rollup");
+ validConfig.put("longCol.aggregationType", aggregationType);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+ }
+ }
+
private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status
status, long startTime, long endTime,
TimeUnit timeUnit, String downloadURL) {
SegmentZKMetadata realtimeSegmentZKMetadata = new
SegmentZKMetadata(segmentName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]