This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 442c0fc713 Configurable sketch accuracy in merge rollup task (#14373)
442c0fc713 is described below
commit 442c0fc71343d8f8ff152a818867f03d240bb758
Author: David Cromberge <[email protected]>
AuthorDate: Tue Dec 10 11:05:46 2024 +0200
Configurable sketch accuracy in merge rollup task (#14373)
* Configurable sketch accuracy in merge rollup task
* Run mvn spotless:apply
---
.../apache/pinot/core/common/MinionConstants.java | 1 +
.../DistinctCountCPCSketchAggregator.java | 3 ++-
.../aggregator/DistinctCountHLLAggregator.java | 3 ++-
.../DistinctCountThetaSketchAggregator.java | 24 ++++++++++++-----
.../aggregator/DistinctCountULLAggregator.java | 3 ++-
.../aggregator/IntegerTupleSketchAggregator.java | 21 +++++++++++++--
.../processing/aggregator/MaxValueAggregator.java | 3 ++-
.../processing/aggregator/MinValueAggregator.java | 3 ++-
.../processing/aggregator/SumValueAggregator.java | 3 ++-
.../processing/aggregator/ValueAggregator.java | 5 +++-
.../framework/SegmentProcessorConfig.java | 23 +++++++++++++++--
.../segment/processing/reducer/ReducerFactory.java | 3 ++-
.../segment/processing/reducer/RollupReducer.java | 20 +++++++++++----
.../pinot/plugin/minion/tasks/MergeTaskUtils.java | 22 ++++++++++++++++
.../tasks/mergerollup/MergeRollupTaskExecutor.java | 4 +++
.../tasks/mergerollup/MergeRollupTaskUtils.java | 15 +++++++++++
.../RealtimeToOfflineSegmentsTaskExecutor.java | 4 +++
.../plugin/minion/tasks/MergeTaskUtilsTest.java | 30 +++++++++++++++++-----
.../mergerollup/MergeRollupTaskUtilsTest.java | 24 +++++++++++++++++
19 files changed, 185 insertions(+), 29 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 3c4b9b322f..48349099b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -103,6 +103,7 @@ public class MinionConstants {
// Merge config
public static final String MERGE_TYPE_KEY = "mergeType";
public static final String AGGREGATION_TYPE_KEY_SUFFIX =
".aggregationType";
+ public static final String AGGREGATION_FUNCTION_PARAMETERS_PREFIX =
"aggregationFunctionParameters.";
public static final String MODE = "mode";
public static final String PROCESS_FROM_WATERMARK_MODE =
"processFromWatermark";
public static final String PROCESS_ALL_MODE = "processAll";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
index 82e9a74161..73985f564d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.cpc.CpcUnion;
import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -30,7 +31,7 @@ public class DistinctCountCPCSketchAggregator implements
ValueAggregator {
}
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
CpcSketch first =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
CpcSketch second =
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
CpcSketch result;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
index 4eecbe3696..940d356a95 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
@@ -20,12 +20,13 @@ package org.apache.pinot.core.segment.processing.aggregator;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.Map;
import org.apache.pinot.core.common.ObjectSerDeUtils;
public class DistinctCountHLLAggregator implements ValueAggregator {
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
try {
HyperLogLog first =
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value1);
HyperLogLog second =
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value2);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
index b11f7d7b00..f22e38ed3c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
@@ -18,26 +18,38 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.utils.CommonConstants;
public class DistinctCountThetaSketchAggregator implements ValueAggregator {
- private final Union _union;
-
public DistinctCountThetaSketchAggregator() {
- // TODO: Handle configurable nominal entries
- _union =
Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion();
}
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
+ String nominalEntriesParam =
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
+
+ int sketchNominalEntries;
+
+ // Check if nominal entries values match
+ if (nominalEntriesParam != null) {
+ sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+ } else {
+ // If the functionParameters don't have an explicit nominal entries
value set,
+ // use the default value for nominal entries
+ sketchNominalEntries =
CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
+ }
+
+ Union union =
Union.builder().setNominalEntries(sketchNominalEntries).buildUnion();
Sketch first =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
Sketch second =
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
- Sketch result = _union.union(first, second);
+ Sketch result = union.union(first, second);
return ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(result);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
index 2a51ac052b..70469f8cf4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
@@ -19,12 +19,13 @@
package org.apache.pinot.core.segment.processing.aggregator;
import com.dynatrace.hash4j.distinctcount.UltraLogLog;
+import java.util.Map;
import org.apache.pinot.core.common.ObjectSerDeUtils;
public class DistinctCountULLAggregator implements ValueAggregator {
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
UltraLogLog first =
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value1);
UltraLogLog second =
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value2);
// add to the one with a larger P and return that
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
index 8bdf7f8a86..b7df4c05fe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
@@ -18,11 +18,14 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.spi.utils.CommonConstants;
public class IntegerTupleSketchAggregator implements ValueAggregator {
@@ -33,10 +36,24 @@ public class IntegerTupleSketchAggregator implements
ValueAggregator {
}
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
+ String nominalEntriesParam =
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
+
+ int sketchNominalEntries;
+
+ // Check if nominal entries values match
+ if (nominalEntriesParam != null) {
+ sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+ } else {
+ // If the functionParameters don't have an explicit nominal entries
value set,
+ // use the default value for nominal entries
+ sketchNominalEntries = (int) Math.pow(2,
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ }
+
Sketch<IntegerSummary> first =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
Sketch<IntegerSummary> second =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
- Sketch<IntegerSummary> result = new Union<>(new
IntegerSummarySetOperations(_mode, _mode)).union(first, second);
+ Sketch<IntegerSummary> result =
+ new Union<>(sketchNominalEntries, new
IntegerSummarySetOperations(_mode, _mode)).union(first, second);
return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
index 6a231b036c..1c4fa5a498 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec;
@@ -33,7 +34,7 @@ public class MaxValueAggregator implements ValueAggregator {
}
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
Object result;
switch (_dataType) {
case INT:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
index 9352cc99d0..8914dfa7c8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec;
@@ -33,7 +34,7 @@ public class MinValueAggregator implements ValueAggregator {
}
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
Object result;
switch (_dataType) {
case INT:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
index 0570cca1b5..8b7d57d889 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec;
@@ -33,7 +34,7 @@ public class SumValueAggregator implements ValueAggregator {
}
@Override
- public Object aggregate(Object value1, Object value2) {
+ public Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters) {
Object result;
switch (_dataType) {
case INT:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
index 016e0fb091..70d90dd100 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import java.util.Map;
+
+
/**
* Interface for value aggregator
*/
@@ -27,5 +30,5 @@ public interface ValueAggregator {
* Given two values, return the aggregated value
* @return aggregated value given two column values
*/
- Object aggregate(Object value1, Object value2);
+ Object aggregate(Object value1, Object value2, Map<String, String>
functionParameters);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
index 053f78b6f3..56009608ee 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
@@ -44,12 +44,14 @@ public class SegmentProcessorConfig {
private final List<PartitionerConfig> _partitionerConfigs;
private final MergeType _mergeType;
private final Map<String, AggregationFunctionType> _aggregationTypes;
+ private final Map<String, Map<String, String>>
_aggregationFunctionParameters;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;
private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
- Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig
segmentConfig,
+ Map<String, AggregationFunctionType> aggregationTypes,
+ Map<String, Map<String, String>> aggregationFunctionParameters,
SegmentConfig segmentConfig,
Consumer<Object> progressObserver) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
_tableConfig = tableConfig;
@@ -58,6 +60,7 @@ public class SegmentProcessorConfig {
_partitionerConfigs = partitionerConfigs;
_mergeType = mergeType;
_aggregationTypes = aggregationTypes;
+ _aggregationFunctionParameters = aggregationFunctionParameters;
_segmentConfig = segmentConfig;
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
@@ -106,6 +109,13 @@ public class SegmentProcessorConfig {
return _aggregationTypes;
}
+ /**
+ * The aggregation function parameters for the SegmentProcessorFramework's
reduce phase with ROLLUP merge type
+ */
+ public Map<String, Map<String, String>> getAggregationFunctionParameters() {
+ return _aggregationFunctionParameters;
+ }
+
/**
* The SegmentConfig for the SegmentProcessorFramework's reduce phase
*/
@@ -134,6 +144,7 @@ public class SegmentProcessorConfig {
private List<PartitionerConfig> _partitionerConfigs;
private MergeType _mergeType;
private Map<String, AggregationFunctionType> _aggregationTypes;
+ private Map<String, Map<String, String>> _aggregationFunctionParameters;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;
@@ -167,6 +178,11 @@ public class SegmentProcessorConfig {
return this;
}
+ public Builder setAggregationFunctionParameters(Map<String, Map<String,
String>> aggregationFunctionParameters) {
+ _aggregationFunctionParameters = aggregationFunctionParameters;
+ return this;
+ }
+
public Builder setSegmentConfig(SegmentConfig segmentConfig) {
_segmentConfig = segmentConfig;
return this;
@@ -193,11 +209,14 @@ public class SegmentProcessorConfig {
if (_aggregationTypes == null) {
_aggregationTypes = Collections.emptyMap();
}
+ if (_aggregationFunctionParameters == null) {
+ _aggregationFunctionParameters = Collections.emptyMap();
+ }
if (_segmentConfig == null) {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema,
_timeHandlerConfig, _partitionerConfigs, _mergeType,
- _aggregationTypes, _segmentConfig, _progressObserver);
+ _aggregationTypes, _aggregationFunctionParameters, _segmentConfig,
_progressObserver);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
index a205500e34..59fca478fa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
@@ -38,7 +38,8 @@ public class ReducerFactory {
case CONCAT:
return new ConcatReducer(fileManager);
case ROLLUP:
- return new RollupReducer(partitionId, fileManager,
processorConfig.getAggregationTypes(), reducerOutputDir);
+ return new RollupReducer(partitionId, fileManager,
processorConfig.getAggregationTypes(),
+ processorConfig.getAggregationFunctionParameters(),
reducerOutputDir);
case DEDUP:
return new DedupReducer(partitionId, fileManager, reducerOutputDir);
default:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
index ae88120f20..fdd1a67173 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.processing.reducer;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@@ -47,14 +48,17 @@ public class RollupReducer implements Reducer {
private final String _partitionId;
private final GenericRowFileManager _fileManager;
private final Map<String, AggregationFunctionType> _aggregationTypes;
+ private final Map<String, Map<String, String>>
_aggregationFunctionParameters;
private final File _reducerOutputDir;
private GenericRowFileManager _rollupFileManager;
public RollupReducer(String partitionId, GenericRowFileManager fileManager,
- Map<String, AggregationFunctionType> aggregationTypes, File
reducerOutputDir) {
+ Map<String, AggregationFunctionType> aggregationTypes,
+ Map<String, Map<String, String>> aggregationFunctionParameters, File
reducerOutputDir) {
_partitionId = partitionId;
_fileManager = fileManager;
_aggregationTypes = aggregationTypes;
+ _aggregationFunctionParameters = aggregationFunctionParameters;
_reducerOutputDir = reducerOutputDir;
}
@@ -91,7 +95,8 @@ public class RollupReducer implements Reducer {
for (FieldSpec fieldSpec : fieldSpecs) {
if (fieldSpec.getFieldType() == FieldType.METRIC) {
aggregatorContextList.add(new AggregatorContext(fieldSpec,
- _aggregationTypes.getOrDefault(fieldSpec.getName(),
DEFAULT_AGGREGATOR_TYPE)));
+ _aggregationTypes.getOrDefault(fieldSpec.getName(),
DEFAULT_AGGREGATOR_TYPE),
+ _aggregationFunctionParameters.getOrDefault(fieldSpec.getName(),
Collections.emptyMap())));
}
}
@@ -159,7 +164,8 @@ public class RollupReducer implements Reducer {
} else {
// Non-null field, aggregate the value
aggregatedRow.putValue(column,
-
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column),
rowToAggregate.getValue(column)));
+
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column),
rowToAggregate.getValue(column),
+ aggregatorContext._functionParameters));
}
}
}
@@ -169,17 +175,21 @@ public class RollupReducer implements Reducer {
for (AggregatorContext aggregatorContext : aggregatorContextList) {
String column = aggregatorContext._column;
aggregatedRow.putValue(column,
-
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column),
rowToAggregate.getValue(column)));
+
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column),
rowToAggregate.getValue(column),
+ aggregatorContext._functionParameters));
}
}
private static class AggregatorContext {
final String _column;
final ValueAggregator _aggregator;
+ final Map<String, String> _functionParameters;
- AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType
aggregationType) {
+ AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType
aggregationType,
+ Map<String, String> functionParameters) {
_column = fieldSpec.getName();
_aggregator = ValueAggregatorFactory.getValueAggregator(aggregationType,
fieldSpec.getDataType());
+ _functionParameters = functionParameters;
}
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 34bf2e5ecf..43f951629b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -137,6 +137,28 @@ public class MergeTaskUtils {
return aggregationTypes;
}
+ /**
+ * Returns a map from column name to the aggregation function parameters
associated with it based on the task config.
+ */
+ public static Map<String, Map<String, String>>
getAggregationFunctionParameters(Map<String, String> taskConfig) {
+ Map<String, Map<String, String>> aggregationFunctionParameters = new
HashMap<>();
+ String prefix = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;
+
+ for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(prefix)) {
+ String[] parts = key.substring(prefix.length()).split("\\.", 2);
+ if (parts.length == 2) {
+ String metricColumn = parts[0];
+ String paramName = parts[1];
+ aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new
HashMap<>()).put(paramName, value);
+ }
+ }
+ }
+ return aggregationFunctionParameters;
+ }
+
/**
* Returns the segment config based on the task config.
*/
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
index bebedb3ff2..455849f648 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
@@ -91,6 +91,10 @@ public class MergeRollupTaskExecutor extends
BaseMultipleSegmentsConversionExecu
// Aggregation types
segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
+ // Aggregation function parameters
+ segmentProcessorConfigBuilder.setAggregationFunctionParameters(
+ MergeTaskUtils.getAggregationFunctionParameters(configs));
+
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
index c3c7720e1b..b040b54d53 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.MergeTask;
@@ -51,12 +53,25 @@ public class MergeRollupTaskUtils {
*/
public static Map<String, Map<String, String>>
getLevelToConfigMap(Map<String, String> taskConfig) {
Map<String, Map<String, String>> levelToConfigMap = new TreeMap<>();
+
+ // Regex to match aggregation function parameter keys
+ Pattern pattern =
Pattern.compile("(\\w+)\\.aggregationFunctionParameters\\.(\\w+)\\.(\\w+)");
+
for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
String key = entry.getKey();
for (String configKey : VALID_CONFIG_KEYS) {
if (key.endsWith(configKey)) {
String level = key.substring(0, key.length() - configKey.length() -
1);
levelToConfigMap.computeIfAbsent(level, k -> new
TreeMap<>()).put(configKey, entry.getValue());
+ } else {
+ Matcher matcher = pattern.matcher(key);
+ if (matcher.matches()) {
+ String level = matcher.group(1).trim(); // e.g., "1day" or "1hour"
+ String metric = matcher.group(2).trim(); // e.g., "metricColumnA"
or "metricColumnB"
+ String param = matcher.group(3).trim(); // e.g., "nominalEntries"
or "p"
+ String metricParam =
MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + metric + "." + param;
+ levelToConfigMap.computeIfAbsent(level, k -> new
TreeMap<>()).put(metricParam, entry.getValue());
+ }
}
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index 502fa1cc76..bb1fc70afa 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -149,6 +149,10 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends
BaseMultipleSegmentsC
// Aggregation types
segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
+ // Aggregation function parameters
+ segmentProcessorConfigBuilder.setAggregationFunctionParameters(
+ MergeTaskUtils.getAggregationFunctionParameters(configs));
+
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
index 731607784b..c60b86899e 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
@@ -49,8 +49,9 @@ public class MergeTaskUtilsTest {
public void testGetTimeHandlerConfig() {
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build();
- Schema schema = new Schema.SchemaBuilder()
- .addDateTime("dateTime", DataType.LONG,
"1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", "1:SECONDS").build();
+ Schema schema =
+ new Schema.SchemaBuilder().addDateTime("dateTime", DataType.LONG,
"1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss",
+ "1:SECONDS").build();
Map<String, String> taskConfig = new HashMap<>();
long expectedWindowStartMs = 1625097600000L;
long expectedWindowEndMs = 1625184000000L;
@@ -171,6 +172,23 @@ public class MergeTaskUtilsTest {
}
}
+ @Test
+ public void testGetAggregationFunctionParameters() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX +
"metricColumnA.param1", "value1");
+ taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX +
"metricColumnA.param2", "value2");
+ taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX +
"metricColumnB.param1", "value3");
+ taskConfig.put("otherPrefix.metricColumnC.param1", "value1");
+ taskConfig.put("aggregationFunction.metricColumnD.param2", "value2");
+ Map<String, Map<String, String>> result =
MergeTaskUtils.getAggregationFunctionParameters(taskConfig);
+ assertEquals(result.size(), 2);
+ assertTrue(result.containsKey("metricColumnA"));
+ assertTrue(result.containsKey("metricColumnB"));
+ assertEquals(result.get("metricColumnA").get("param1"), "value1");
+ assertEquals(result.get("metricColumnA").get("param2"), "value2");
+ assertEquals(result.get("metricColumnB").get("param1"), "value3");
+ }
+
@Test
public void testGetSegmentConfig() {
Map<String, String> taskConfig = new HashMap<>();
@@ -206,12 +224,12 @@ public class MergeTaskUtilsTest {
segmentZKMetadata.setCustomMap(Collections.emptyMap());
assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata));
- segmentZKMetadata
-
.setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY,
"false"));
+ segmentZKMetadata.setCustomMap(
+
Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY,
"false"));
assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata));
- segmentZKMetadata
-
.setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY,
"true"));
+ segmentZKMetadata.setCustomMap(
+
Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY,
"true"));
assertFalse(MergeTaskUtils.allowMerge(segmentZKMetadata));
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
index f73559728f..90827e2fa9 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
@@ -98,4 +98,28 @@ public class MergeRollupTaskUtilsTest {
assertTrue(result.contains("dimension2"), "Expected set to contain
'dimension2'");
assertTrue(result.contains("dimension3"), "Expected set to contain
'dimension3'");
}
+
+ @Test
+ public void testAggregationFunctionParameters() {
+ Map<String, String> taskConfig = new HashMap<>();
+
taskConfig.put("hourly.aggregationFunctionParameters.metricColumnA.nominalEntries",
"16384");
+
taskConfig.put("hourly.aggregationFunctionParameters.metricColumnB.nominalEntries",
"8192");
+
taskConfig.put("daily.aggregationFunctionParameters.metricColumnA.nominalEntries",
"8192");
+
taskConfig.put("daily.aggregationFunctionParameters.metricColumnB.nominalEntries",
"4096");
+
+ Map<String, Map<String, String>> levelToConfigMap =
MergeRollupTaskUtils.getLevelToConfigMap(taskConfig);
+ assertEquals(levelToConfigMap.size(), 2);
+
+ Map<String, String> hourlyConfig = levelToConfigMap.get("hourly");
+ assertNotNull(hourlyConfig);
+ assertEquals(hourlyConfig.size(), 2);
+
assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"),
"16384");
+
assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"),
"8192");
+
+ Map<String, String> dailyConfig = levelToConfigMap.get("daily");
+ assertNotNull(dailyConfig);
+ assertEquals(dailyConfig.size(), 2);
+
assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"),
"8192");
+
assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"),
"4096");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]