This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4588f8ca34 Config for max output segment size in UpsertCompactMerge 
task (#14742)
4588f8ca34 is described below

commit 4588f8ca345fb3c3740f463409b5a77e8bdc378c
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Tue Jan 7 15:52:54 2025 +0530

    Config for max output segment size in UpsertCompactMerge task (#14742)
    
    * Config for max output segment size in UpsertCompactMerge task
    
    * address comments
---
 .../apache/pinot/core/common/MinionConstants.java  | 10 ++++
 .../UpsertCompactMergeTaskGenerator.java           | 58 ++++++++++++++++------
 .../UpsertCompactMergeTaskGeneratorTest.java       | 12 ++---
 3 files changed, 60 insertions(+), 20 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 7a276d6254..24db1f9ede 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -282,6 +282,16 @@ public class MinionConstants {
      */
     public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = 
"maxNumSegmentsPerTask";
 
+    /**
+     * maximum size of output segments to produce
+     */
+    public static final String OUTPUT_SEGMENT_MAX_SIZE_KEY = 
"outputSegmentMaxSize";
+
+    /**
+     * default output segment size
+     */
+    public static final String DEFAULT_OUTPUT_SEGMENT_MAX_SIZE = "200MB";
+
     /**
      * default maximum number of segments to process in a single task
      */
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index ae3a4aa0d8..dd7bf28353 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -47,6 +47,7 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,11 +64,14 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
     private final SegmentZKMetadata _segmentZKMetadata;
     private final long _validDocIds;
     private final long _invalidDocIds;
+    private final double _segmentSizeInBytes;
 
-    SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long 
validDocIds, long invalidDocIds) {
+    SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long 
validDocIds, long invalidDocIds,
+        double segmentSizeInBytes) {
       _segmentZKMetadata = segmentZKMetadata;
       _validDocIds = validDocIds;
       _invalidDocIds = invalidDocIds;
+      _segmentSizeInBytes = segmentSizeInBytes;
     }
 
     public SegmentZKMetadata getSegmentZKMetadata() {
@@ -81,6 +85,10 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
     public long getInvalidDocIds() {
       return _invalidDocIds;
     }
+
+    public double getSegmentSizeInBytes() {
+      return _segmentSizeInBytes;
+    }
   }
 
   public static class SegmentSelectionResult {
@@ -226,6 +234,27 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
       Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, 
Set<String> alreadyMergedSegments) {
     Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge 
= new HashMap<>();
     Set<String> segmentsForDeletion = new HashSet<>();
+
+    // task config thresholds
+    long validDocsThreshold = Long.parseLong(
+        
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+            
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
+    long maxRecordsPerTask = Long.parseLong(
+        
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
+            
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
+    long maxNumSegments = Long.parseLong(
+        
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
+            
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
+    // default to Long.MAX_VALUE to avoid size-based compaction by default
+    long outputSegmentMaxSizeInBytes = Long.MAX_VALUE;
+    try {
+      outputSegmentMaxSizeInBytes = DataSizeUtils.toBytes(
+          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY,
+              
MinionConstants.UpsertCompactMergeTask.DEFAULT_OUTPUT_SEGMENT_MAX_SIZE));
+    } catch (Exception e) {
+      LOGGER.warn("Invalid value for outputSegmentMaxSizeInBytes, defaulting 
to Long.MAX_VALUE", e);
+    }
+
     for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
       // check if segment is part of completed segments
       if (!candidateSegmentsMap.containsKey(segmentName)) {
@@ -237,6 +266,7 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
       for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
         long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
         long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
+        long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
 
         // Skip segments if the crc from zk metadata and server does not 
match. They may be getting reloaded.
         if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
@@ -260,8 +290,10 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
                 MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
             continue;
           }
+          double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes * 
totalValidDocs * 1.0) / totalDocs;
           segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> 
new ArrayList<>())
-              .add(new SegmentMergerMetadata(segment, totalValidDocs, 
totalInvalidDocs));
+              .add(new SegmentMergerMetadata(segment, totalValidDocs, 
totalInvalidDocs,
+                  expectedSegmentSizeAfterCompaction));
         }
         break;
       }
@@ -277,17 +309,6 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
     for (Map.Entry<Integer, List<SegmentMergerMetadata>> entry : 
segmentsEligibleForCompactMerge.entrySet()) {
       int partitionID = entry.getKey();
       List<SegmentMergerMetadata> segments = entry.getValue();
-      // task config thresholds
-      // TODO add output segment size as one of the thresholds
-      long validDocsThreshold = Long.parseLong(
-          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
-              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
-      long maxRecordsPerTask = Long.parseLong(
-          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
-              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
-      long maxNumSegments = Long.parseLong(
-          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
-              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
 
       // List to store groups for the current partition
       List<List<SegmentMergerMetadata>> groups = new ArrayList<>();
@@ -296,18 +317,22 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
       // variables to maintain current group sum
       long currentValidDocsSum = 0;
       long currentTotalDocsSum = 0;
+      double currentOutputSegmentSizeInBytes = 0.0;
 
       for (SegmentMergerMetadata segment : segments) {
         long validDocs = segment.getValidDocIds();
         long invalidDocs = segment.getInvalidDocIds();
+        double expectedSegmentSizeInBytes = segment.getSegmentSizeInBytes();
 
         // Check if adding this segment would keep the validDocs sum within 
the threshold
         if (currentValidDocsSum + validDocs <= validDocsThreshold && 
currentGroup.size() < maxNumSegments
-            && currentTotalDocsSum + validDocs + invalidDocs < 
maxRecordsPerTask) {
+            && currentTotalDocsSum + validDocs + invalidDocs < 
maxRecordsPerTask
+            && currentOutputSegmentSizeInBytes + expectedSegmentSizeInBytes < 
outputSegmentMaxSizeInBytes) {
           // Add the segment to the current group
           currentGroup.add(segment);
           currentValidDocsSum += validDocs;
           currentTotalDocsSum += validDocs + invalidDocs;
+          currentOutputSegmentSizeInBytes += expectedSegmentSizeInBytes;
         } else {
           // Finalize the current group and start a new one
           if (!currentGroup.isEmpty()) {
@@ -319,6 +344,7 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
           currentGroup.add(segment);
           currentValidDocsSum = validDocs;
           currentTotalDocsSum = validDocs + invalidDocs;
+          currentOutputSegmentSizeInBytes = expectedSegmentSizeInBytes;
         }
       }
       // Add the last group
@@ -408,6 +434,10 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
     Preconditions.checkState(upsertConfig.isEnableSnapshot(),
         String.format("'enableSnapshot' from UpsertConfig must be enabled for 
%s",
             MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+    // check valid task config for maxOutputSegmentSize
+    if 
(taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY))
 {
+      
DataSizeUtils.toBytes(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY));
+    }
   }
 
   @VisibleForTesting
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index 5556ac53cd..3709392e76 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -221,13 +221,13 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     // single segment
     segmentMergerMetadataList =
-        List.of(new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10));
+        List.of(new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10, 100000));
     
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), 
"fs://testTable__0");
 
     // multiple segments
     segmentMergerMetadataList = Arrays.asList(
-        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10),
-        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 
20)
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10, 100000),
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 
20, 100000)
     );
     
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
         "fs://testTable__0,fs://testTable__1");
@@ -241,13 +241,13 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     // single segment
     segmentMergerMetadataList =
-        List.of(new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10));
+        List.of(new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10, 100000));
     
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
 "1000");
 
     // multiple segments
     segmentMergerMetadataList = Arrays.asList(
-        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10),
-        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 
20)
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10, 100000),
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 
20, 100000)
     );
     
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
 "1000,2000");
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to