This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4588f8ca34 Config for max output segment size in UpsertCompactMerge
task (#14742)
4588f8ca34 is described below
commit 4588f8ca345fb3c3740f463409b5a77e8bdc378c
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Tue Jan 7 15:52:54 2025 +0530
Config for max output segment size in UpsertCompactMerge task (#14742)
* Config for max output segment size in UpsertCompactMerge task
* address comments
---
.../apache/pinot/core/common/MinionConstants.java | 10 ++++
.../UpsertCompactMergeTaskGenerator.java | 58 ++++++++++++++++------
.../UpsertCompactMergeTaskGeneratorTest.java | 12 ++---
3 files changed, 60 insertions(+), 20 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 7a276d6254..24db1f9ede 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -282,6 +282,16 @@ public class MinionConstants {
*/
public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY =
"maxNumSegmentsPerTask";
+ /**
+ * maximum size of output segments to produce
+ */
+ public static final String OUTPUT_SEGMENT_MAX_SIZE_KEY =
"outputSegmentMaxSize";
+
+ /**
+ * default output segment size
+ */
+ public static final String DEFAULT_OUTPUT_SEGMENT_MAX_SIZE = "200MB";
+
/**
* default maximum number of segments to process in a single task
*/
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index ae3a4aa0d8..dd7bf28353 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -47,6 +47,7 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,11 +64,14 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
private final SegmentZKMetadata _segmentZKMetadata;
private final long _validDocIds;
private final long _invalidDocIds;
+ private final double _segmentSizeInBytes;
- SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long
validDocIds, long invalidDocIds) {
+ SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long
validDocIds, long invalidDocIds,
+ double segmentSizeInBytes) {
_segmentZKMetadata = segmentZKMetadata;
_validDocIds = validDocIds;
_invalidDocIds = invalidDocIds;
+ _segmentSizeInBytes = segmentSizeInBytes;
}
public SegmentZKMetadata getSegmentZKMetadata() {
@@ -81,6 +85,10 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
public long getInvalidDocIds() {
return _invalidDocIds;
}
+
+ public double getSegmentSizeInBytes() {
+ return _segmentSizeInBytes;
+ }
}
public static class SegmentSelectionResult {
@@ -226,6 +234,27 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
Set<String> alreadyMergedSegments) {
Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge
= new HashMap<>();
Set<String> segmentsForDeletion = new HashSet<>();
+
+ // task config thresholds
+ long validDocsThreshold = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
+ long maxRecordsPerTask = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
+ long maxNumSegments = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
+ // default to Long.MAX_VALUE to avoid size-based compaction by default
+ long outputSegmentMaxSizeInBytes = Long.MAX_VALUE;
+ try {
+ outputSegmentMaxSizeInBytes = DataSizeUtils.toBytes(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY,
+
MinionConstants.UpsertCompactMergeTask.DEFAULT_OUTPUT_SEGMENT_MAX_SIZE));
+ } catch (Exception e) {
+ LOGGER.warn("Invalid value for outputSegmentMaxSizeInBytes, defaulting
to Long.MAX_VALUE", e);
+ }
+
for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
// check if segment is part of completed segments
if (!candidateSegmentsMap.containsKey(segmentName)) {
@@ -237,6 +266,7 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
+ long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
// Skip segments if the crc from zk metadata and server does not
match. They may be getting reloaded.
if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
@@ -260,8 +290,10 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
continue;
}
+ double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes *
totalValidDocs * 1.0) / totalDocs;
segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k ->
new ArrayList<>())
- .add(new SegmentMergerMetadata(segment, totalValidDocs,
totalInvalidDocs));
+ .add(new SegmentMergerMetadata(segment, totalValidDocs,
totalInvalidDocs,
+ expectedSegmentSizeAfterCompaction));
}
break;
}
@@ -277,17 +309,6 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
for (Map.Entry<Integer, List<SegmentMergerMetadata>> entry :
segmentsEligibleForCompactMerge.entrySet()) {
int partitionID = entry.getKey();
List<SegmentMergerMetadata> segments = entry.getValue();
- // task config thresholds
- // TODO add output segment size as one of the thresholds
- long validDocsThreshold = Long.parseLong(
-
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
-
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
- long maxRecordsPerTask = Long.parseLong(
-
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
-
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
- long maxNumSegments = Long.parseLong(
-
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
-
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
// List to store groups for the current partition
List<List<SegmentMergerMetadata>> groups = new ArrayList<>();
@@ -296,18 +317,22 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
// variables to maintain current group sum
long currentValidDocsSum = 0;
long currentTotalDocsSum = 0;
+ double currentOutputSegmentSizeInBytes = 0.0;
for (SegmentMergerMetadata segment : segments) {
long validDocs = segment.getValidDocIds();
long invalidDocs = segment.getInvalidDocIds();
+ double expectedSegmentSizeInBytes = segment.getSegmentSizeInBytes();
// Check if adding this segment would keep the validDocs sum within
the threshold
if (currentValidDocsSum + validDocs <= validDocsThreshold &&
currentGroup.size() < maxNumSegments
- && currentTotalDocsSum + validDocs + invalidDocs <
maxRecordsPerTask) {
+ && currentTotalDocsSum + validDocs + invalidDocs <
maxRecordsPerTask
+ && currentOutputSegmentSizeInBytes + expectedSegmentSizeInBytes <
outputSegmentMaxSizeInBytes) {
// Add the segment to the current group
currentGroup.add(segment);
currentValidDocsSum += validDocs;
currentTotalDocsSum += validDocs + invalidDocs;
+ currentOutputSegmentSizeInBytes += expectedSegmentSizeInBytes;
} else {
// Finalize the current group and start a new one
if (!currentGroup.isEmpty()) {
@@ -319,6 +344,7 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
currentGroup.add(segment);
currentValidDocsSum = validDocs;
currentTotalDocsSum = validDocs + invalidDocs;
+ currentOutputSegmentSizeInBytes = expectedSegmentSizeInBytes;
}
}
// Add the last group
@@ -408,6 +434,10 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
Preconditions.checkState(upsertConfig.isEnableSnapshot(),
String.format("'enableSnapshot' from UpsertConfig must be enabled for
%s",
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+ // check valid task config for maxOutputSegmentSize
+ if
(taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY))
{
+
DataSizeUtils.toBytes(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY));
+ }
}
@VisibleForTesting
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index 5556ac53cd..3709392e76 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -221,13 +221,13 @@ public class UpsertCompactMergeTaskGeneratorTest {
// single segment
segmentMergerMetadataList =
- List.of(new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10));
+ List.of(new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10, 100000));
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
"fs://testTable__0");
// multiple segments
segmentMergerMetadataList = Arrays.asList(
- new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10),
- new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200,
20)
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10, 100000),
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200,
20, 100000)
);
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
"fs://testTable__0,fs://testTable__1");
@@ -241,13 +241,13 @@ public class UpsertCompactMergeTaskGeneratorTest {
// single segment
segmentMergerMetadataList =
- List.of(new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10));
+ List.of(new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10, 100000));
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
"1000");
// multiple segments
segmentMergerMetadataList = Arrays.asList(
- new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10),
- new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200,
20)
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10, 100000),
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200,
20, 100000)
);
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
"1000,2000");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]