This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 626c45d717 Upsert small segment merger task in minions (#14477)
626c45d717 is described below

commit 626c45d717d917226aef73dc836fb3aedf431e59
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Thu Dec 12 19:34:37 2024 +0530

    Upsert small segment merger task in minions (#14477)
---
 .../apache/pinot/common/utils/SegmentUtils.java    |   2 +-
 .../apache/pinot/core/common/MinionConstants.java  |  56 +++
 .../framework/SegmentProcessorConfig.java          |  35 +-
 .../framework/SegmentProcessorFramework.java       |   5 +-
 .../UpsertCompactMergeTaskExecutor.java            | 203 ++++++++++
 .../UpsertCompactMergeTaskExecutorFactory.java     |  52 +++
 .../UpsertCompactMergeTaskGenerator.java           | 425 +++++++++++++++++++++
 ...ertCompactMergeTaskProgressObserverFactory.java |  33 ++
 .../UpsertCompactMergeTaskExecutorTest.java        |  95 +++++
 .../UpsertCompactMergeTaskGeneratorTest.java       | 254 ++++++++++++
 10 files changed, 1155 insertions(+), 5 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index 26d231651d..4c594e8bee 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -63,7 +63,7 @@ public class SegmentUtils {
   }
 
   @Nullable
-  private static Integer getPartitionIdFromRealtimeSegmentName(String 
segmentName) {
+  public static Integer getPartitionIdFromRealtimeSegmentName(String 
segmentName) {
     // A fast path to get partition id if the segmentName is in a known format 
like LLC.
     LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
     if (llcSegmentName != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 48349099b4..08b0eca909 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -65,6 +65,7 @@ public class MinionConstants {
    */
   public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
   public static final String ENABLE_REPLACE_SEGMENTS_KEY = 
"enableReplaceSegments";
+  public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1;
 
   /**
    * Job configs
@@ -223,4 +224,59 @@ public class MinionConstants {
      */
     public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 
"numSegmentsBatchPerServerRequest";
   }
+
+  public static class UpsertCompactMergeTask {
+    public static final String TASK_TYPE = "UpsertCompactMergeTask";
+
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window 
younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+
+    /**
+     * number of segments to query in one batch to fetch valid doc id 
metadata, by default 500
+     */
+    public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 
"numSegmentsBatchPerServerRequest";
+
+    /**
+     * prefix for the new segment name that is created,
+     * {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
will add __ as delimiter
+     * so not adding _ as a suffix here.
+     */
+    public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted";
+
+    /**
+     * maximum number of records to process in a single task, sum of all docs 
in to-be-merged segments
+     */
+    public static final String MAX_NUM_RECORDS_PER_TASK_KEY = 
"maxNumRecordsPerTask";
+
+    /**
+     * default maximum number of records to process in a single task, same as 
the value in {@link MergeRollupTask}
+     */
+    public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
+
+    /**
+     * maximum number of records in the output segment
+     */
+    public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = 
"maxNumRecordsPerSegment";
+
+    /**
+     * default maximum number of records in output segment, same as the value 
in
+     * {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig}
+     */
+    public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+
+    /**
+     * maximum number of segments to process in a single task
+     */
+    public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = 
"maxNumSegmentsPerTask";
+
+    /**
+     * default maximum number of segments to process in a single task
+     */
+    public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;
+
+    public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
index 56009608ee..3b22ce84a1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
@@ -23,10 +23,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
 import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
 import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
@@ -47,12 +49,15 @@ public class SegmentProcessorConfig {
   private final Map<String, Map<String, String>> 
_aggregationFunctionParameters;
   private final SegmentConfig _segmentConfig;
   private final Consumer<Object> _progressObserver;
+  private final SegmentNameGenerator _segmentNameGenerator;
+  private final Long _customCreationTime;
 
   private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, 
TimeHandlerConfig timeHandlerConfig,
       List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
       Map<String, AggregationFunctionType> aggregationTypes,
       Map<String, Map<String, String>> aggregationFunctionParameters, 
SegmentConfig segmentConfig,
-      Consumer<Object> progressObserver) {
+      Consumer<Object> progressObserver, @Nullable SegmentNameGenerator 
segmentNameGenerator,
+      @Nullable Long customCreationTime) {
     TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     _tableConfig = tableConfig;
     _schema = schema;
@@ -65,6 +70,8 @@ public class SegmentProcessorConfig {
     _progressObserver = (progressObserver != null) ? progressObserver : p -> {
       // Do nothing.
     };
+    _segmentNameGenerator = segmentNameGenerator;
+    _customCreationTime = customCreationTime;
   }
 
   /**
@@ -127,11 +134,20 @@ public class SegmentProcessorConfig {
     return _progressObserver;
   }
 
+  public SegmentNameGenerator getSegmentNameGenerator() {
+    return _segmentNameGenerator;
+  }
+
+  public long getCustomCreationTime() {
+    return _customCreationTime != null ? _customCreationTime : 
System.currentTimeMillis();
+  }
+
   @Override
   public String toString() {
     return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", 
_schema=" + _schema + ", _timeHandlerConfig="
         + _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs 
+ ", _mergeType=" + _mergeType
-        + ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + 
_segmentConfig + '}';
+        + ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + 
_segmentConfig
+        + ", _segmentNameGenerator=" + _segmentNameGenerator + ", 
_customCreationTime=" + _customCreationTime + '}';
   }
 
   /**
@@ -147,6 +163,8 @@ public class SegmentProcessorConfig {
     private Map<String, Map<String, String>> _aggregationFunctionParameters;
     private SegmentConfig _segmentConfig;
     private Consumer<Object> _progressObserver;
+    private SegmentNameGenerator _segmentNameGenerator;
+    private Long _customCreationTime;
 
     public Builder setTableConfig(TableConfig tableConfig) {
       _tableConfig = tableConfig;
@@ -193,6 +211,16 @@ public class SegmentProcessorConfig {
       return this;
     }
 
+    public Builder setSegmentNameGenerator(SegmentNameGenerator 
segmentNameGenerator) {
+      _segmentNameGenerator = segmentNameGenerator;
+      return this;
+    }
+
+    public Builder setCustomCreationTime(Long customCreationTime) {
+      _customCreationTime = customCreationTime;
+      return this;
+    }
+
     public SegmentProcessorConfig build() {
       Preconditions.checkState(_tableConfig != null, "Must provide table 
config in SegmentProcessorConfig");
       Preconditions.checkState(_schema != null, "Must provide schema in 
SegmentProcessorConfig");
@@ -216,7 +244,8 @@ public class SegmentProcessorConfig {
         _segmentConfig = new SegmentConfig.Builder().build();
       }
       return new SegmentProcessorConfig(_tableConfig, _schema, 
_timeHandlerConfig, _partitionerConfigs, _mergeType,
-          _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, 
_progressObserver);
+          _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, 
_progressObserver,
+              _segmentNameGenerator, _customCreationTime);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 378d79fbab..a6d3f74042 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -280,8 +280,11 @@ public class SegmentProcessorFramework {
     SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
     generatorConfig.setOutDir(_segmentsOutputDir.getPath());
     Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
+    
generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime()));
 
-    if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) 
{
+    if (_segmentProcessorConfig.getSegmentNameGenerator() != null) {
+      
generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator());
+    } else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() 
!= null) {
       generatorConfig.setSegmentNameGenerator(
           SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, 
schema, segmentNamePrefix,
               segmentNamePostfix, fixedSegmentName, false));
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
new file mode 100644
index 0000000000..0f326f3314
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import 
org.apache.pinot.core.segment.processing.framework.DefaultSegmentNumRowProvider;
+import 
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
+import 
org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.minion.MinionConf;
+import 
org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import 
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
+import 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Minion task that compacts and merges multiple segments of an upsert table 
and uploads it back as one single
+ * segment. This helps in keeping the segment count in check and also prevents 
a lot of small segments created over
+ * time.
+ */
+public class UpsertCompactMergeTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);
+
+  public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
+    super(minionConf);
+  }
+
+  @Override
+  protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> segmentDirs,
+      File workingDir)
+      throws Exception {
+    int numInputSegments = segmentDirs.size();
+    _eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + 
numInputSegments);
+    String taskType = pinotTaskConfig.getTaskType();
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Progress observer
+    segmentProcessorConfigBuilder.setProgressObserver(p -> 
_eventObserver.notifyProgress(_pinotTaskConfig, p));
+
+    // get list of segment metadata
+    List<SegmentMetadataImpl> segmentMetadataList = segmentDirs.stream().map(x 
-> {
+      try {
+        return new SegmentMetadataImpl(x);
+      } catch (Exception e) {
+        throw new RuntimeException(String.format("Error fetching 
segment-metadata for segmentDir: %s", x), e);
+      }
+    }).collect(Collectors.toList());
+
+    // validate if partitionID is same for all small segments. Get partition 
id value for new segment.
+    int partitionID = getCommonPartitionIDForSegments(segmentMetadataList);
+
+    // get the max creation time of the small segments. This will be the index 
creation time for the new segment.
+    Optional<Long> maxCreationTimeOfMergingSegments =
+        
segmentMetadataList.stream().map(SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max);
+    if (maxCreationTimeOfMergingSegments.isEmpty()) {
+      String message = "No valid creation time found for the new merged 
segment. This might be due to "
+          + "missing creation time for merging segments";
+      LOGGER.error(message);
+      throw new RuntimeException(message);
+    }
+
+    // validate if crc of deepstore copies is same as that in ZK of segments
+    List<String> originalSegmentCrcFromTaskGenerator =
+        
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
+    validateCRCForInputSegments(segmentMetadataList, 
originalSegmentCrcFromTaskGenerator);
+
+    // Fetch validDocID snapshot from server and get record-reader for 
compacted reader.
+    List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
+      RoaringBitmap validDocIds = 
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, 
x.getName(),
+          ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
+      if (validDocIds == null) {
+        // no valid crc match found or no validDocIds obtained from all servers
+        // error out the task instead of silently failing so that we can track 
it via task-error metrics
+        String message = String.format("No validDocIds found from all servers. 
They either failed to download "
+            + "or did not match crc from segment copy obtained from deepstore 
/ servers. " + "Expected crc: %s", "");
+        LOGGER.error(message);
+        throw new IllegalStateException(message);
+      }
+      return new CompactedPinotSegmentRecordReader(x.getIndexDir(), 
validDocIds);
+    }).collect(Collectors.toList());
+
+    // create new UploadedRealtimeSegment
+    
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get());
+    segmentProcessorConfigBuilder.setSegmentNameGenerator(
+        new 
UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType),
 partitionID,
+            System.currentTimeMillis(), 
MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
+    SegmentProcessorConfig segmentProcessorConfig = 
segmentProcessorConfigBuilder.build();
+    List<File> outputSegmentDirs;
+    try {
+      _eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
+      outputSegmentDirs = new 
SegmentProcessorFramework(segmentProcessorConfig, workingDir,
+          
SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(recordReaders),
+          Collections.emptyList(), new 
DefaultSegmentNumRowProvider(Integer.parseInt(
+          
configs.get(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)))).process();
+    } finally {
+      for (RecordReader recordReader : recordReaders) {
+        recordReader.close();
+      }
+    }
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", 
taskType, configs, (endMillis - startMillis));
+
+    List<SegmentConversionResult> results = new ArrayList<>();
+    for (File outputSegmentDir : outputSegmentDirs) {
+      String outputSegmentName = outputSegmentDir.getName();
+      results.add(new 
SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
+          .setTableNameWithType(tableNameWithType).build());
+    }
+    return results;
+  }
+
+  @Override
+  protected SegmentZKMetadataCustomMapModifier 
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+      SegmentConversionResult segmentConversionResult) {
+    Map<String, String> updateMap = new TreeMap<>();
+    updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX,
+        String.valueOf(System.currentTimeMillis()));
+    updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+            + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
+        pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
+    return new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
 updateMap);
+  }
+
+  int getCommonPartitionIDForSegments(List<SegmentMetadataImpl> 
segmentMetadataList) {
+    List<String> segmentNames =
+        
segmentMetadataList.stream().map(SegmentMetadataImpl::getName).collect(Collectors.toList());
+    Set<Integer> partitionIDSet = segmentNames.stream().map(x -> {
+      Integer segmentPartitionId = 
SegmentUtils.getPartitionIdFromRealtimeSegmentName(x);
+      if (segmentPartitionId == null) {
+        throw new IllegalStateException(String.format("Partition id not found 
for %s", x));
+      }
+      return segmentPartitionId;
+    }).collect(Collectors.toSet());
+    if (partitionIDSet.size() > 1) {
+      throw new IllegalStateException(
+          "Found segments with different partition ids during task execution: 
" + partitionIDSet);
+    }
+    return partitionIDSet.iterator().next();
+  }
+
+  void validateCRCForInputSegments(List<SegmentMetadataImpl> 
segmentMetadataList, List<String> expectedCRCList) {
+    for (int i = 0; i < segmentMetadataList.size(); i++) {
+      SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+      if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
+        String message = String.format("Crc mismatched between ZK and 
deepstore copy of segment: %s. Expected crc "
+                + "from ZK: %s, crc from deepstore: %s", 
segmentMetadata.getName(), expectedCRCList.get(i),
+            segmentMetadata.getCrc());
+        LOGGER.error(message);
+        throw new IllegalStateException(message);
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java
new file mode 100644
index 0000000000..b93684dae7
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
+public class UpsertCompactMergeTaskExecutorFactory implements 
PinotTaskExecutorFactory {
+
+  private MinionConf _minionConf;
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf 
minionConf) {
+    _minionConf = minionConf;
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+  }
+
+  @Override
+  public PinotTaskExecutor create() {
+    return new UpsertCompactMergeTaskExecutor(_minionConf);
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
new file mode 100644
index 0000000000..ae3a4aa0d8
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactMergeTaskGenerator extends BaseTaskGenerator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertCompactMergeTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+  private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
+
+  public static class SegmentMergerMetadata {
+    private final SegmentZKMetadata _segmentZKMetadata;
+    private final long _validDocIds;
+    private final long _invalidDocIds;
+
+    SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long 
validDocIds, long invalidDocIds) {
+      _segmentZKMetadata = segmentZKMetadata;
+      _validDocIds = validDocIds;
+      _invalidDocIds = invalidDocIds;
+    }
+
+    public SegmentZKMetadata getSegmentZKMetadata() {
+      return _segmentZKMetadata;
+    }
+
+    public long getValidDocIds() {
+      return _validDocIds;
+    }
+
+    public long getInvalidDocIds() {
+      return _invalidDocIds;
+    }
+  }
+
+  public static class SegmentSelectionResult {
+
+    private final Map<Integer, List<List<SegmentMergerMetadata>>> 
_segmentsForCompactMergeByPartition;
+
+    private final List<String> _segmentsForDeletion;
+
+    SegmentSelectionResult(Map<Integer, List<List<SegmentMergerMetadata>>> 
segmentsForCompactMergeByPartition,
+        List<String> segmentsForDeletion) {
+      _segmentsForCompactMergeByPartition = segmentsForCompactMergeByPartition;
+      _segmentsForDeletion = segmentsForDeletion;
+    }
+
+    public Map<Integer, List<List<SegmentMergerMetadata>>> 
getSegmentsForCompactMergeByPartition() {
+      return _segmentsForCompactMergeByPartition;
+    }
+
+    public List<String> getSegmentsForDeletion() {
+      return _segmentsForDeletion;
+    }
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig : tableConfigs) {
+
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {}", 
tableNameWithType);
+
+      if (tableConfig.getTaskConfig() == null) {
+        LOGGER.warn("Task config is null for table: {}", tableNameWithType);
+        continue;
+      }
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> incompleteTasks =
+          TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType, 
_clusterInfoAccessor);
+      if (!incompleteTasks.isEmpty()) {
+        LOGGER.warn("Found incomplete tasks: {} for same table: {} and task 
type: {}. Skipping task generation.",
+            incompleteTasks.keySet(), tableNameWithType, taskType);
+        continue;
+      }
+
+      Map<String, String> taskConfigs = 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+      List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+      // Get completed segments and filter out the segments based on the 
buffer time configuration
+      List<SegmentZKMetadata> candidateSegments =
+          getCandidateSegments(taskConfigs, allSegments, 
System.currentTimeMillis());
+
+      if (candidateSegments.isEmpty()) {
+        LOGGER.info("No segments were eligible for compactMerge task for 
table: {}", tableNameWithType);
+        continue;
+      }
+
+      // get server to segment mappings
+      PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
+      Map<String, List<String>> serverToSegments = 
pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      BiMap<String, String> serverToEndpoints;
+      try {
+        serverToEndpoints = 
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+      } catch (InvalidConfigException e) {
+        throw new RuntimeException(e);
+      }
+
+      ServerSegmentMetadataReader serverSegmentMetadataReader =
+          new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
+              _clusterInfoAccessor.getConnectionManager());
+
+      // Number of segments to query per server request. If a table has a lot 
of segments, then we might send a
+      // huge payload to pinot-server in request. Batching the requests will 
help in reducing the payload size.
+      int numSegmentsBatchPerServerRequest = Integer.parseInt(
+          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
+              String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
+
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+          
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
 serverToSegments,
+              serverToEndpoints, null, 60_000, 
ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest);
+
+      Map<String, SegmentZKMetadata> candidateSegmentsMap =
+          
candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
+
+      Set<String> alreadyMergedSegments = 
getAlreadyMergedSegments(allSegments);
+
+      SegmentSelectionResult segmentSelectionResult =
+          processValidDocIdsMetadata(taskConfigs, candidateSegmentsMap, 
validDocIdsMetadataList, alreadyMergedSegments);
+
+      if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+        pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
+            "0d");
+        LOGGER.info(
+            "Deleted segments containing only invalid records for table: {}, 
number of segments to be deleted: {}",
+            tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion());
+      }
+
+      int numTasks = 0;
+      int maxTasks = 
Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
+          String.valueOf(MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS)));
+      for (Map.Entry<Integer, List<List<SegmentMergerMetadata>>> entry
+          : 
segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) {
+        if (numTasks == maxTasks) {
+          break;
+        }
+        List<List<SegmentMergerMetadata>> groups = entry.getValue();
+        // no valid groups found in the partition to merge
+        if (groups.isEmpty()) {
+          continue;
+        }
+        // there are no groups with more than 1 segment to merge
+        // TODO this can be later removed if we want to just do single-segment 
compaction from this task
+        if (groups.get(0).size() <= 1) {
+          continue;
+        }
+        // TODO see if multiple groups of same partition can be added
+        Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig,
+            groups.get(0).stream().map(x -> 
x.getSegmentZKMetadata().getSegmentName()).collect(Collectors.toList())));
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
getDownloadUrl(groups.get(0)));
+        configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
getSegmentCrcList(groups.get(0)));
+        
configs.put(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
 String.valueOf(
+            Long.parseLong(
+                
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+                    
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)))));
+        pinotTaskConfigs.add(new 
PinotTaskConfig(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, configs));
+        numTasks++;
+      }
+      LOGGER.info("Finished generating {} tasks configs for table: {}", 
numTasks, tableNameWithType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @VisibleForTesting
+  public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, 
String> taskConfigs,
+      Map<String, SegmentZKMetadata> candidateSegmentsMap,
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, 
Set<String> alreadyMergedSegments) {
+    Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge 
= new HashMap<>();
+    Set<String> segmentsForDeletion = new HashSet<>();
+    for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
+      // check if segment is part of completed segments
+      if (!candidateSegmentsMap.containsKey(segmentName)) {
+        LOGGER.debug("Segment {} is not found in the candidate segments list, 
skipping it for {}", segmentName,
+            MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+        continue;
+      }
+      SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
+      for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
+        long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
+        long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
+
+        // Skip segments if the crc from zk metadata and server does not 
match. They may be getting reloaded.
+        if (segment.getCrc() != 
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
+          LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, 
validDocIdsMetadata={})", segmentName,
+              segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
+          continue;
+        }
+
+        // segments eligible for deletion with no valid records
+        long totalDocs = validDocIdsMetadata.getTotalDocs();
+        if (totalInvalidDocs == totalDocs) {
+          segmentsForDeletion.add(segmentName);
+        } else if (alreadyMergedSegments.contains(segmentName)) {
+          LOGGER.debug("Segment {} already merged. Skipping it for {}", 
segmentName,
+              MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+          break;
+        } else {
+          Integer partitionID = 
SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
+          if (partitionID == null) {
+            LOGGER.warn("Partition ID not found for segment: {}, skipping it 
for {}", segmentName,
+                MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+            continue;
+          }
+          segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> 
new ArrayList<>())
+              .add(new SegmentMergerMetadata(segment, totalValidDocs, 
totalInvalidDocs));
+        }
+        break;
+      }
+    }
+
+    segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) -> 
segmentList.sort(
+        Comparator.comparingLong(o -> 
o.getSegmentZKMetadata().getCreationTime())));
+
+    // Map to store the result: each key (partition) will have a list of groups
+    Map<Integer, List<List<SegmentMergerMetadata>>> groupedSegments = new 
HashMap<>();
+
+    // Iterate over each partition and process its segments list
+    for (Map.Entry<Integer, List<SegmentMergerMetadata>> entry : 
segmentsEligibleForCompactMerge.entrySet()) {
+      int partitionID = entry.getKey();
+      List<SegmentMergerMetadata> segments = entry.getValue();
+      // task config thresholds
+      // TODO add output segment size as one of the thresholds
+      long validDocsThreshold = Long.parseLong(
+          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
+      long maxRecordsPerTask = Long.parseLong(
+          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
+              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
+      long maxNumSegments = Long.parseLong(
+          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
+              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
+
+      // List to store groups for the current partition
+      List<List<SegmentMergerMetadata>> groups = new ArrayList<>();
+      List<SegmentMergerMetadata> currentGroup = new ArrayList<>();
+
+      // variables to maintain current group sum
+      long currentValidDocsSum = 0;
+      long currentTotalDocsSum = 0;
+
+      for (SegmentMergerMetadata segment : segments) {
+        long validDocs = segment.getValidDocIds();
+        long invalidDocs = segment.getInvalidDocIds();
+
+        // Check if adding this segment would keep the validDocs sum within 
the threshold
+        if (currentValidDocsSum + validDocs <= validDocsThreshold && 
currentGroup.size() < maxNumSegments
+            && currentTotalDocsSum + validDocs + invalidDocs < 
maxRecordsPerTask) {
+          // Add the segment to the current group
+          currentGroup.add(segment);
+          currentValidDocsSum += validDocs;
+          currentTotalDocsSum += validDocs + invalidDocs;
+        } else {
+          // Finalize the current group and start a new one
+          if (!currentGroup.isEmpty()) {
+            groups.add(new ArrayList<>(currentGroup));  // Add the finalized 
group
+          }
+
+          // Reset current group, sums and start with the new segment
+          currentGroup = new ArrayList<>();
+          currentGroup.add(segment);
+          currentValidDocsSum = validDocs;
+          currentTotalDocsSum = validDocs + invalidDocs;
+        }
+      }
+      // Add the last group
+      if (!currentGroup.isEmpty()) {
+        groups.add(new ArrayList<>(currentGroup)); // Add a copy of the 
current group
+      }
+
+      // Sort groups by total invalidDocs in descending order, if invalidDocs 
count are same, prefer group with
+      // higher number of small segments in them
+      // remove the groups having only 1 segments in them
+      // TODO this check can be later removed if we want single-segment 
compaction from this task itself
+      List<List<SegmentMergerMetadata>> compactMergeGroups =
+          groups.stream().filter(x -> x.size() > 1).sorted((group1, group2) -> 
{
+            long invalidDocsSum1 = 
group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+            long invalidDocsSum2 = 
group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+            if (invalidDocsSum2 < invalidDocsSum1) {
+              return -1;
+            } else if (invalidDocsSum2 == invalidDocsSum1) {
+              return Long.compare(group2.size(), group1.size());
+            } else {
+              return 1;
+            }
+          }).collect(Collectors.toList());
+
+      if (!compactMergeGroups.isEmpty()) {
+        groupedSegments.put(partitionID, compactMergeGroups);
+      }
+    }
+    return new SegmentSelectionResult(groupedSegments, new 
ArrayList<>(segmentsForDeletion));
+  }
+
+  @VisibleForTesting
+  public static List<SegmentZKMetadata> getCandidateSegments(Map<String, 
String> taskConfigs,
+      List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
+    List<SegmentZKMetadata> candidateSegments = new ArrayList<>();
+    String bufferPeriod =
+        
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY,
 DEFAULT_BUFFER_PERIOD);
+    long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+    for (SegmentZKMetadata segment : allSegments) {
+      // Skip segments if HDFS download url is empty. This also avoids any 
race condition with deepstore upload
+      // retry task and this task
+      if (StringUtils.isBlank(segment.getDownloadUrl())) {
+        LOGGER.warn("Skipping segment {} for task as download url is empty", 
segment.getSegmentName());
+        continue;
+      }
+      // initial segments selection based on status and age
+      if (segment.getStatus().isCompleted() && (segment.getEndTimeMs() <= 
(currentTimeInMillis - bufferMs))) {
+        candidateSegments.add(segment);
+      }
+    }
+    return candidateSegments;
+  }
+
+  @VisibleForTesting
+  protected static Set<String> 
getAlreadyMergedSegments(List<SegmentZKMetadata> allSegments) {
+    Set<String> alreadyMergedSegments = new HashSet<>();
+    for (SegmentZKMetadata segment : allSegments) {
+      // check if the segment has custom map having list of segments which 
merged to form this. we will later
+      // filter out the merged segments as they will be deleted
+      if (segment.getCustomMap() != null && !segment.getCustomMap().isEmpty() 
&& !StringUtils.isBlank(
+          
segment.getCustomMap().get(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+              + 
MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX))) {
+        
alreadyMergedSegments.addAll(List.of(StringUtils.split(segment.getCustomMap().get(
+            MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+                + 
MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX), ",")));
+      }
+    }
+    return alreadyMergedSegments;
+  }
+
+  @Override
+  public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> 
taskConfigs) {
+    // check table is realtime
+    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+        String.format("%s only supports realtime tables!", 
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+    // check upsert enabled
+    Preconditions.checkState(tableConfig.isUpsertEnabled(),
+        String.format("Upsert must be enabled for %s", 
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+    // check no malformed period
+    if 
(taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY))
 {
+      
TimeUtils.convertPeriodToMillis(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY));
+    }
+    // check enableSnapshot = true
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    Preconditions.checkNotNull(upsertConfig,
+        String.format("UpsertConfig must be provided for %s", 
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+    Preconditions.checkState(upsertConfig.isEnableSnapshot(),
+        String.format("'enableSnapshot' from UpsertConfig must be enabled for 
%s",
+            MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+  }
+
+  @VisibleForTesting
+  protected String getDownloadUrl(List<SegmentMergerMetadata> 
segmentMergerMetadataList) {
+    return StringUtils.join(segmentMergerMetadataList.stream().map(x -> 
x.getSegmentZKMetadata().getDownloadUrl())
+        .collect(Collectors.toList()), ",");
+  }
+
+  @VisibleForTesting
+  protected String getSegmentCrcList(List<SegmentMergerMetadata> 
segmentMergerMetadataList) {
+    return StringUtils.join(
+        segmentMergerMetadataList.stream().map(x -> 
String.valueOf(x.getSegmentZKMetadata().getCrc()))
+            .collect(Collectors.toList()), ",");
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java
new file mode 100644
index 0000000000..1a717e88db
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+
+
+@EventObserverFactory
+public class UpsertCompactMergeTaskProgressObserverFactory extends 
BaseMinionProgressObserverFactory {
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java
new file mode 100644
index 0000000000..9c5093d1ab
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+
+public class UpsertCompactMergeTaskExecutorTest {
+  private UpsertCompactMergeTaskExecutor _taskExecutor;
+
+  @BeforeClass
+  public void setUp() {
+    _taskExecutor = new UpsertCompactMergeTaskExecutor(null);
+  }
+
+  @Test
+  public void testValidateCRCForInputSegments() {
+    SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+    SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+
+    Mockito.when(segment1.getCrc()).thenReturn("1000");
+    Mockito.when(segment2.getCrc()).thenReturn("2000");
+
+    List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, 
segment2);
+    List<String> expectedCRCList = Arrays.asList("1000", "2000");
+
+    _taskExecutor.validateCRCForInputSegments(segmentMetadataList, 
expectedCRCList);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testValidateCRCForInputSegmentsWithMismatchedCRC() {
+    SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+    SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+
+    Mockito.when(segment1.getCrc()).thenReturn("1000");
+    Mockito.when(segment2.getCrc()).thenReturn("3000");
+
+    List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, 
segment2);
+    List<String> expectedCRCList = Arrays.asList("1000", "2000");
+
+    _taskExecutor.validateCRCForInputSegments(segmentMetadataList, 
expectedCRCList);
+  }
+
+  @Test
+  public void testGetCommonPartitionIDForSegments() {
+    SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+    SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+    SegmentMetadataImpl segment3 = Mockito.mock(SegmentMetadataImpl.class);
+
+    Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0");
+    Mockito.when(segment2.getName()).thenReturn("testTable__0__1__0");
+    Mockito.when(segment3.getName()).thenReturn("testTable__0__2__0");
+
+    List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, 
segment2, segment3);
+
+    int partitionID = 
_taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList);
+    Assert.assertEquals(partitionID, 0);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testGetCommonPartitionIDForSegmentsWithDifferentPartitionIDs() {
+    SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+    SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+
+    Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0");
+    Mockito.when(segment2.getName()).thenReturn("testTable__1__0__0");
+
+    List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1, 
segment2);
+
+    _taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList);
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
new file mode 100644
index 0000000000..5556ac53cd
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UpsertCompactMergeTaskGeneratorTest {
+
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private UpsertCompactMergeTaskGenerator _taskGenerator;
+  private TableConfig _tableConfig;
+  private SegmentZKMetadata _completedSegment;
+  private SegmentZKMetadata _completedSegment2;
+  private Map<String, SegmentZKMetadata> _completedSegmentsMap;
+
+  @BeforeClass
+  public void setUp() {
+    _taskGenerator = new UpsertCompactMergeTaskGenerator();
+    Map<String, Map<String, String>> tableTaskConfigs = new HashMap<>();
+    Map<String, String> compactionConfigs = new HashMap<>();
+    tableTaskConfigs.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, 
compactionConfigs);
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableSnapshot(true);
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setUpsertConfig(upsertConfig)
+            .setTaskConfig(new TableTaskConfig(tableTaskConfigs)).build();
+
+    _completedSegment = new SegmentZKMetadata("testTable__0");
+    _completedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    _completedSegment.setStartTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("12d"));
+    _completedSegment.setEndTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("11d"));
+    _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+    _completedSegment.setTotalDocs(100L);
+    _completedSegment.setCrc(1000);
+    _completedSegment.setDownloadUrl("fs://testTable__0");
+
+    _completedSegment2 = new SegmentZKMetadata("testTable__1");
+    _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    _completedSegment2.setStartTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("10d"));
+    _completedSegment2.setEndTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("9d"));
+    _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS);
+    _completedSegment2.setTotalDocs(10L);
+    _completedSegment2.setCrc(2000);
+    _completedSegment2.setDownloadUrl("fs://testTable__1");
+
+    _completedSegmentsMap = new HashMap<>();
+    _completedSegmentsMap.put(_completedSegment.getSegmentName(), 
_completedSegment);
+    _completedSegmentsMap.put(_completedSegment2.getSegmentName(), 
_completedSegment2);
+  }
+
+  @Test
+  public void testUpsertCompactMergeTaskConfig() {
+
+    // check with OFFLINE table
+    Map<String, String> upsertCompactMergeTaskConfig =
+        ImmutableMap.of("bufferTimePeriod", "5d");
+    TableConfig offlineTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+            new 
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+            upsertCompactMergeTaskConfig)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(offlineTableConfig, 
upsertCompactMergeTaskConfig));
+
+    // check with non-upsert REALTIME table
+    TableConfig nonUpsertRealtimetableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+            upsertCompactMergeTaskConfig)))
+        .build();
+
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig, 
upsertCompactMergeTaskConfig));
+
+    // check with snapshot disabled
+    TableConfig disabledSnapshotTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+            upsertCompactMergeTaskConfig)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig, 
upsertCompactMergeTaskConfig));
+
+    // valid table configs
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableSnapshot(true);
+    TableConfig validTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(upsertConfig)
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+            upsertCompactMergeTaskConfig)))
+        .build();
+    _taskGenerator.validateTaskConfigs(validTableConfig, 
upsertCompactMergeTaskConfig);
+
+    // invalid buffer time period
+    Map<String, String> upsertCompactMergeTaskConfig1 =
+        ImmutableMap.of("bufferTimePeriod", "5hd");
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> _taskGenerator.validateTaskConfigs(validTableConfig, 
upsertCompactMergeTaskConfig1));
+  }
+
+  @Test
+  public void testGetAlreadyMergedSegments() {
+    SegmentZKMetadata mergedSegment = new 
SegmentZKMetadata("testTable__merged");
+    mergedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    Map<String, String> customMap = new HashMap<>();
+    customMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+        + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX, 
"testTable__0,testTable__1");
+    mergedSegment.setCustomMap(customMap);
+
+    // merged segment present
+    List<SegmentZKMetadata> allSegments = Arrays.asList(_completedSegment, 
_completedSegment2, mergedSegment);
+    Set<String> alreadyMergedSegments = 
UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments);
+    Assert.assertEquals(alreadyMergedSegments.size(), 2);
+    Assert.assertTrue(alreadyMergedSegments.contains("testTable__0"));
+    Assert.assertTrue(alreadyMergedSegments.contains("testTable__1"));
+
+    // no merging happened till now
+    List<SegmentZKMetadata> segments = Arrays.asList(_completedSegment, 
_completedSegment2);
+    alreadyMergedSegments = 
UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(segments);
+    Assert.assertTrue(alreadyMergedSegments.isEmpty());
+
+    // no segment present, empty list
+    alreadyMergedSegments = 
UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(Collections.emptyList());
+    Assert.assertTrue(alreadyMergedSegments.isEmpty());
+  }
+
+  @Test
+  public void testGetCandidateSegments() {
+    Map<String, String> taskConfigs = new HashMap<>();
+    
taskConfigs.put(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY, 
"5d");
+
+    // candidates are valid, outside buffer period and download urls
+    List<SegmentZKMetadata> candidateSegments = 
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+        new ArrayList<>(_completedSegmentsMap.values()), 
System.currentTimeMillis());
+    Assert.assertEquals(candidateSegments.size(), 2);
+    Assert.assertTrue(candidateSegments.contains(_completedSegment));
+    Assert.assertTrue(candidateSegments.contains(_completedSegment2));
+
+    // candidate have empty download url
+    SegmentZKMetadata segmentWithNoDownloadUrl = new 
SegmentZKMetadata("testTable__2");
+    
segmentWithNoDownloadUrl.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentWithNoDownloadUrl.setStartTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("10d"));
+    segmentWithNoDownloadUrl.setEndTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("9d"));
+    segmentWithNoDownloadUrl.setTimeUnit(TimeUnit.MILLISECONDS);
+    segmentWithNoDownloadUrl.setTotalDocs(100L);
+    segmentWithNoDownloadUrl.setCrc(1000);
+    segmentWithNoDownloadUrl.setDownloadUrl("");
+    candidateSegments = 
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+        List.of(segmentWithNoDownloadUrl), System.currentTimeMillis());
+    Assert.assertEquals(candidateSegments.size(), 0);
+
+    // candidates are within buffer period
+    SegmentZKMetadata segmentWithinBufferPeriod = new 
SegmentZKMetadata("testTable__3");
+    
segmentWithinBufferPeriod.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentWithinBufferPeriod.setStartTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("1d"));
+    segmentWithinBufferPeriod.setEndTime(System.currentTimeMillis());
+    segmentWithinBufferPeriod.setTimeUnit(TimeUnit.MILLISECONDS);
+    segmentWithinBufferPeriod.setTotalDocs(100L);
+    segmentWithinBufferPeriod.setCrc(1000);
+    segmentWithinBufferPeriod.setDownloadUrl("fs://testTable__3");
+    candidateSegments = 
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+        List.of(segmentWithinBufferPeriod), System.currentTimeMillis());
+    Assert.assertEquals(candidateSegments.size(), 0);
+
+    // no completed segment
+    SegmentZKMetadata incompleteSegment = new 
SegmentZKMetadata("testTable__4");
+    
incompleteSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    incompleteSegment.setStartTime(System.currentTimeMillis() - 
TimeUtils.convertPeriodToMillis("1d"));
+    incompleteSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+    incompleteSegment.setTotalDocs(100L);
+    incompleteSegment.setCrc(1000);
+    candidateSegments = 
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+        List.of(incompleteSegment), System.currentTimeMillis());
+    Assert.assertEquals(candidateSegments.size(), 0);
+  }
+
+  @Test
+  public void testGetDownloadUrl() {
+    // empty list
+    List<UpsertCompactMergeTaskGenerator.SegmentMergerMetadata> 
segmentMergerMetadataList = Arrays.asList();
+    
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), 
"");
+
+    // single segment
+    segmentMergerMetadataList =
+        List.of(new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10));
+    
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), 
"fs://testTable__0");
+
+    // multiple segments
+    segmentMergerMetadataList = Arrays.asList(
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10),
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 
20)
+    );
+    
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
+        "fs://testTable__0,fs://testTable__1");
+  }
+
+  @Test
+  public void testGetSegmentCrcList() {
+    // empty list
+    List<UpsertCompactMergeTaskGenerator.SegmentMergerMetadata> 
segmentMergerMetadataList = Arrays.asList();
+    
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
 "");
+
+    // single segment
+    segmentMergerMetadataList =
+        List.of(new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10));
+    
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
 "1000");
+
+    // multiple segments
+    segmentMergerMetadataList = Arrays.asList(
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 
10),
+        new 
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 
20)
+    );
+    
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
 "1000,2000");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to