This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 626c45d717 Upsert small segment merger task in minions (#14477)
626c45d717 is described below
commit 626c45d717d917226aef73dc836fb3aedf431e59
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Thu Dec 12 19:34:37 2024 +0530
Upsert small segment merger task in minions (#14477)
---
.../apache/pinot/common/utils/SegmentUtils.java | 2 +-
.../apache/pinot/core/common/MinionConstants.java | 56 +++
.../framework/SegmentProcessorConfig.java | 35 +-
.../framework/SegmentProcessorFramework.java | 5 +-
.../UpsertCompactMergeTaskExecutor.java | 203 ++++++++++
.../UpsertCompactMergeTaskExecutorFactory.java | 52 +++
.../UpsertCompactMergeTaskGenerator.java | 425 +++++++++++++++++++++
...ertCompactMergeTaskProgressObserverFactory.java | 33 ++
.../UpsertCompactMergeTaskExecutorTest.java | 95 +++++
.../UpsertCompactMergeTaskGeneratorTest.java | 254 ++++++++++++
10 files changed, 1155 insertions(+), 5 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index 26d231651d..4c594e8bee 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -63,7 +63,7 @@ public class SegmentUtils {
}
@Nullable
- private static Integer getPartitionIdFromRealtimeSegmentName(String
segmentName) {
+ public static Integer getPartitionIdFromRealtimeSegmentName(String
segmentName) {
// A fast path to get partition id if the segmentName is in a known format
like LLC.
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
if (llcSegmentName != null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 48349099b4..08b0eca909 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -65,6 +65,7 @@ public class MinionConstants {
*/
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY =
"enableReplaceSegments";
+ public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1;
/**
* Job configs
@@ -223,4 +224,59 @@ public class MinionConstants {
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST =
"numSegmentsBatchPerServerRequest";
}
+
+ public static class UpsertCompactMergeTask {
+ public static final String TASK_TYPE = "UpsertCompactMergeTask";
+
+ /**
+ * The time period to wait before picking segments for this task
+ * e.g. if set to "2d", no task will be scheduled for a time window
younger than 2 days
+ */
+ public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+
+ /**
+ * number of segments to query in one batch to fetch valid doc id
metadata, by default 500
+ */
+ public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST =
"numSegmentsBatchPerServerRequest";
+
+ /**
+ * prefix for the new segment name that is created,
+ * {@link
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator}
will add __ as delimiter
+ * so not adding _ as a suffix here.
+ */
+ public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted";
+
+ /**
+ * maximum number of records to process in a single task, sum of all docs
in to-be-merged segments
+ */
+ public static final String MAX_NUM_RECORDS_PER_TASK_KEY =
"maxNumRecordsPerTask";
+
+ /**
+ * default maximum number of records to process in a single task, same as
the value in {@link MergeRollupTask}
+ */
+ public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
+
+ /**
+ * maximum number of records in the output segment
+ */
+ public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY =
"maxNumRecordsPerSegment";
+
+ /**
+ * default maximum number of records in output segment, same as the value
in
+ * {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig}
+ */
+ public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+
+ /**
+ * maximum number of segments to process in a single task
+ */
+ public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY =
"maxNumSegmentsPerTask";
+
+ /**
+ * default maximum number of segments to process in a single task
+ */
+ public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;
+
+ public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
index 56009608ee..3b22ce84a1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
@@ -23,10 +23,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
@@ -47,12 +49,15 @@ public class SegmentProcessorConfig {
private final Map<String, Map<String, String>>
_aggregationFunctionParameters;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;
+ private final SegmentNameGenerator _segmentNameGenerator;
+ private final Long _customCreationTime;
private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
Map<String, AggregationFunctionType> aggregationTypes,
Map<String, Map<String, String>> aggregationFunctionParameters,
SegmentConfig segmentConfig,
- Consumer<Object> progressObserver) {
+ Consumer<Object> progressObserver, @Nullable SegmentNameGenerator
segmentNameGenerator,
+ @Nullable Long customCreationTime) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
_tableConfig = tableConfig;
_schema = schema;
@@ -65,6 +70,8 @@ public class SegmentProcessorConfig {
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
};
+ _segmentNameGenerator = segmentNameGenerator;
+ _customCreationTime = customCreationTime;
}
/**
@@ -127,11 +134,20 @@ public class SegmentProcessorConfig {
return _progressObserver;
}
+ public SegmentNameGenerator getSegmentNameGenerator() {
+ return _segmentNameGenerator;
+ }
+
+ public long getCustomCreationTime() {
+ return _customCreationTime != null ? _customCreationTime :
System.currentTimeMillis();
+ }
+
@Override
public String toString() {
return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ",
_schema=" + _schema + ", _timeHandlerConfig="
+ _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs
+ ", _mergeType=" + _mergeType
- + ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" +
_segmentConfig + '}';
+ + ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" +
_segmentConfig
+ + ", _segmentNameGenerator=" + _segmentNameGenerator + ",
_customCreationTime=" + _customCreationTime + '}';
}
/**
@@ -147,6 +163,8 @@ public class SegmentProcessorConfig {
private Map<String, Map<String, String>> _aggregationFunctionParameters;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;
+ private SegmentNameGenerator _segmentNameGenerator;
+ private Long _customCreationTime;
public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
@@ -193,6 +211,16 @@ public class SegmentProcessorConfig {
return this;
}
+ public Builder setSegmentNameGenerator(SegmentNameGenerator
segmentNameGenerator) {
+ _segmentNameGenerator = segmentNameGenerator;
+ return this;
+ }
+
+ public Builder setCustomCreationTime(Long customCreationTime) {
+ _customCreationTime = customCreationTime;
+ return this;
+ }
+
public SegmentProcessorConfig build() {
Preconditions.checkState(_tableConfig != null, "Must provide table
config in SegmentProcessorConfig");
Preconditions.checkState(_schema != null, "Must provide schema in
SegmentProcessorConfig");
@@ -216,7 +244,8 @@ public class SegmentProcessorConfig {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema,
_timeHandlerConfig, _partitionerConfigs, _mergeType,
- _aggregationTypes, _aggregationFunctionParameters, _segmentConfig,
_progressObserver);
+ _aggregationTypes, _aggregationFunctionParameters, _segmentConfig,
_progressObserver,
+ _segmentNameGenerator, _customCreationTime);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 378d79fbab..a6d3f74042 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -280,8 +280,11 @@ public class SegmentProcessorFramework {
SegmentGeneratorConfig generatorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
+
generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime()));
- if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null)
{
+ if (_segmentProcessorConfig.getSegmentNameGenerator() != null) {
+
generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator());
+ } else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType()
!= null) {
generatorConfig.setSegmentNameGenerator(
SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig,
schema, segmentNamePrefix,
segmentNamePostfix, fixedSegmentName, false));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
new file mode 100644
index 0000000000..0f326f3314
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import
org.apache.pinot.core.segment.processing.framework.DefaultSegmentNumRowProvider;
+import
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
+import
org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.minion.MinionConf;
+import
org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
+import
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Minion task that compacts and merges multiple segments of an upsert table
and uploads it back as one single
+ * segment. This helps in keeping the segment count in check and also prevents
a lot of small segments created over
+ * time.
+ */
+public class UpsertCompactMergeTaskExecutor extends
BaseMultipleSegmentsConversionExecutor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);
+
+ public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
+ super(minionConf);
+ }
+
+ @Override
+ protected List<SegmentConversionResult> convert(PinotTaskConfig
pinotTaskConfig, List<File> segmentDirs,
+ File workingDir)
+ throws Exception {
+ int numInputSegments = segmentDirs.size();
+ _eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " +
numInputSegments);
+ String taskType = pinotTaskConfig.getTaskType();
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
+ String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Progress observer
+ segmentProcessorConfigBuilder.setProgressObserver(p ->
_eventObserver.notifyProgress(_pinotTaskConfig, p));
+
+ // get list of segment metadata
+ List<SegmentMetadataImpl> segmentMetadataList = segmentDirs.stream().map(x
-> {
+ try {
+ return new SegmentMetadataImpl(x);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Error fetching
segment-metadata for segmentDir: %s", x), e);
+ }
+ }).collect(Collectors.toList());
+
+ // validate if partitionID is same for all small segments. Get partition
id value for new segment.
+ int partitionID = getCommonPartitionIDForSegments(segmentMetadataList);
+
+ // get the max creation time of the small segments. This will be the index
creation time for the new segment.
+ Optional<Long> maxCreationTimeOfMergingSegments =
+
segmentMetadataList.stream().map(SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max);
+ if (maxCreationTimeOfMergingSegments.isEmpty()) {
+ String message = "No valid creation time found for the new merged
segment. This might be due to "
+ + "missing creation time for merging segments";
+ LOGGER.error(message);
+ throw new RuntimeException(message);
+ }
+
+ // validate if crc of deepstore copies is same as that in ZK of segments
+ List<String> originalSegmentCrcFromTaskGenerator =
+
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
+ validateCRCForInputSegments(segmentMetadataList,
originalSegmentCrcFromTaskGenerator);
+
+ // Fetch validDocID snapshot from server and get record-reader for
compacted reader.
+ List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
+ RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType,
x.getName(),
+ ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
+ if (validDocIds == null) {
+ // no valid crc match found or no validDocIds obtained from all servers
+ // error out the task instead of silently failing so that we can track
it via task-error metrics
+ String message = String.format("No validDocIds found from all servers.
They either failed to download "
+ + "or did not match crc from segment copy obtained from deepstore
/ servers. " + "Expected crc: %s", "");
+ LOGGER.error(message);
+ throw new IllegalStateException(message);
+ }
+ return new CompactedPinotSegmentRecordReader(x.getIndexDir(),
validDocIds);
+ }).collect(Collectors.toList());
+
+ // create new UploadedRealtimeSegment
+
segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get());
+ segmentProcessorConfigBuilder.setSegmentNameGenerator(
+ new
UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType),
partitionID,
+ System.currentTimeMillis(),
MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null));
+ SegmentProcessorConfig segmentProcessorConfig =
segmentProcessorConfigBuilder.build();
+ List<File> outputSegmentDirs;
+ try {
+ _eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
+ outputSegmentDirs = new
SegmentProcessorFramework(segmentProcessorConfig, workingDir,
+
SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(recordReaders),
+ Collections.emptyList(), new
DefaultSegmentNumRowProvider(Integer.parseInt(
+
configs.get(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)))).process();
+ } finally {
+ for (RecordReader recordReader : recordReaders) {
+ recordReader.close();
+ }
+ }
+
+ long endMillis = System.currentTimeMillis();
+ LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms",
taskType, configs, (endMillis - startMillis));
+
+ List<SegmentConversionResult> results = new ArrayList<>();
+ for (File outputSegmentDir : outputSegmentDirs) {
+ String outputSegmentName = outputSegmentDir.getName();
+ results.add(new
SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
+ .setTableNameWithType(tableNameWithType).build());
+ }
+ return results;
+ }
+
+ @Override
+ protected SegmentZKMetadataCustomMapModifier
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+ SegmentConversionResult segmentConversionResult) {
+ Map<String, String> updateMap = new TreeMap<>();
+ updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX,
+ String.valueOf(System.currentTimeMillis()));
+ updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
+ pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY));
+ return new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
updateMap);
+ }
+
+ int getCommonPartitionIDForSegments(List<SegmentMetadataImpl>
segmentMetadataList) {
+ List<String> segmentNames =
+
segmentMetadataList.stream().map(SegmentMetadataImpl::getName).collect(Collectors.toList());
+ Set<Integer> partitionIDSet = segmentNames.stream().map(x -> {
+ Integer segmentPartitionId =
SegmentUtils.getPartitionIdFromRealtimeSegmentName(x);
+ if (segmentPartitionId == null) {
+ throw new IllegalStateException(String.format("Partition id not found
for %s", x));
+ }
+ return segmentPartitionId;
+ }).collect(Collectors.toSet());
+ if (partitionIDSet.size() > 1) {
+ throw new IllegalStateException(
+ "Found segments with different partition ids during task execution:
" + partitionIDSet);
+ }
+ return partitionIDSet.iterator().next();
+ }
+
+ void validateCRCForInputSegments(List<SegmentMetadataImpl>
segmentMetadataList, List<String> expectedCRCList) {
+ for (int i = 0; i < segmentMetadataList.size(); i++) {
+ SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i);
+ if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) {
+ String message = String.format("Crc mismatched between ZK and
deepstore copy of segment: %s. Expected crc "
+ + "from ZK: %s, crc from deepstore: %s",
segmentMetadata.getName(), expectedCRCList.get(i),
+ segmentMetadata.getCrc());
+ LOGGER.error(message);
+ throw new IllegalStateException(message);
+ }
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java
new file mode 100644
index 0000000000..b93684dae7
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
+public class UpsertCompactMergeTaskExecutorFactory implements
PinotTaskExecutorFactory {
+
+ private MinionConf _minionConf;
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf
minionConf) {
+ _minionConf = minionConf;
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+ }
+
+ @Override
+ public PinotTaskExecutor create() {
+ return new UpsertCompactMergeTaskExecutor(_minionConf);
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
new file mode 100644
index 0000000000..ae3a4aa0d8
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactMergeTaskGenerator extends BaseTaskGenerator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertCompactMergeTaskGenerator.class);
+ private static final String DEFAULT_BUFFER_PERIOD = "2d";
+ private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
+
+ public static class SegmentMergerMetadata {
+ private final SegmentZKMetadata _segmentZKMetadata;
+ private final long _validDocIds;
+ private final long _invalidDocIds;
+
+ SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long
validDocIds, long invalidDocIds) {
+ _segmentZKMetadata = segmentZKMetadata;
+ _validDocIds = validDocIds;
+ _invalidDocIds = invalidDocIds;
+ }
+
+ public SegmentZKMetadata getSegmentZKMetadata() {
+ return _segmentZKMetadata;
+ }
+
+ public long getValidDocIds() {
+ return _validDocIds;
+ }
+
+ public long getInvalidDocIds() {
+ return _invalidDocIds;
+ }
+ }
+
+ public static class SegmentSelectionResult {
+
+ private final Map<Integer, List<List<SegmentMergerMetadata>>>
_segmentsForCompactMergeByPartition;
+
+ private final List<String> _segmentsForDeletion;
+
+ SegmentSelectionResult(Map<Integer, List<List<SegmentMergerMetadata>>>
segmentsForCompactMergeByPartition,
+ List<String> segmentsForDeletion) {
+ _segmentsForCompactMergeByPartition = segmentsForCompactMergeByPartition;
+ _segmentsForDeletion = segmentsForDeletion;
+ }
+
+ public Map<Integer, List<List<SegmentMergerMetadata>>>
getSegmentsForCompactMergeByPartition() {
+ return _segmentsForCompactMergeByPartition;
+ }
+
+ public List<String> getSegmentsForDeletion() {
+ return _segmentsForDeletion;
+ }
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+ String taskType = MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+ for (TableConfig tableConfig : tableConfigs) {
+
+ String tableNameWithType = tableConfig.getTableName();
+ LOGGER.info("Start generating task configs for table: {}",
tableNameWithType);
+
+ if (tableConfig.getTaskConfig() == null) {
+ LOGGER.warn("Task config is null for table: {}", tableNameWithType);
+ continue;
+ }
+
+ // Only schedule 1 task of this type, per table
+ Map<String, TaskState> incompleteTasks =
+ TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType,
_clusterInfoAccessor);
+ if (!incompleteTasks.isEmpty()) {
+ LOGGER.warn("Found incomplete tasks: {} for same table: {} and task
type: {}. Skipping task generation.",
+ incompleteTasks.keySet(), tableNameWithType, taskType);
+ continue;
+ }
+
+ Map<String, String> taskConfigs =
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+ List<SegmentZKMetadata> allSegments =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+ // Get completed segments and filter out the segments based on the
buffer time configuration
+ List<SegmentZKMetadata> candidateSegments =
+ getCandidateSegments(taskConfigs, allSegments,
System.currentTimeMillis());
+
+ if (candidateSegments.isEmpty()) {
+ LOGGER.info("No segments were eligible for compactMerge task for
table: {}", tableNameWithType);
+ continue;
+ }
+
+ // get server to segment mappings
+ PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
+ Map<String, List<String>> serverToSegments =
pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+ BiMap<String, String> serverToEndpoints;
+ try {
+ serverToEndpoints =
pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ } catch (InvalidConfigException e) {
+ throw new RuntimeException(e);
+ }
+
+ ServerSegmentMetadataReader serverSegmentMetadataReader =
+ new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
+ _clusterInfoAccessor.getConnectionManager());
+
+ // Number of segments to query per server request. If a table has a lot
of segments, then we might send a
+ // huge payload to pinot-server in request. Batching the requests will
help in reducing the payload size.
+ int numSegmentsBatchPerServerRequest = Integer.parseInt(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
+ String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
+
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments,
+ serverToEndpoints, null, 60_000,
ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest);
+
+ Map<String, SegmentZKMetadata> candidateSegmentsMap =
+
candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
Function.identity()));
+
+ Set<String> alreadyMergedSegments =
getAlreadyMergedSegments(allSegments);
+
+ SegmentSelectionResult segmentSelectionResult =
+ processValidDocIdsMetadata(taskConfigs, candidateSegmentsMap,
validDocIdsMetadataList, alreadyMergedSegments);
+
+ if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+ pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentSelectionResult.getSegmentsForDeletion(),
+ "0d");
+ LOGGER.info(
+ "Deleted segments containing only invalid records for table: {},
number of segments to be deleted: {}",
+ tableNameWithType,
segmentSelectionResult.getSegmentsForDeletion());
+ }
+
+ int numTasks = 0;
+ int maxTasks =
Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
+ String.valueOf(MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS)));
+ for (Map.Entry<Integer, List<List<SegmentMergerMetadata>>> entry
+ :
segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) {
+ if (numTasks == maxTasks) {
+ break;
+ }
+ List<List<SegmentMergerMetadata>> groups = entry.getValue();
+ // no valid groups found in the partition to merge
+ if (groups.isEmpty()) {
+ continue;
+ }
+ // there are no groups with more than 1 segment to merge
+ // TODO this can be later removed if we want to just do single-segment
compaction from this task
+ if (groups.get(0).size() <= 1) {
+ continue;
+ }
+ // TODO see if multiple groups of same partition can be added
+ Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig,
+ groups.get(0).stream().map(x ->
x.getSegmentZKMetadata().getSegmentName()).collect(Collectors.toList())));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY,
getDownloadUrl(groups.get(0)));
+ configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
+ configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
getSegmentCrcList(groups.get(0)));
+
configs.put(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
String.valueOf(
+ Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)))));
+ pinotTaskConfigs.add(new
PinotTaskConfig(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, configs));
+ numTasks++;
+ }
+ LOGGER.info("Finished generating {} tasks configs for table: {}",
numTasks, tableNameWithType);
+ }
+ return pinotTaskConfigs;
+ }
+
+ @VisibleForTesting
+ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String,
String> taskConfigs,
+ Map<String, SegmentZKMetadata> candidateSegmentsMap,
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
Set<String> alreadyMergedSegments) {
+ Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge
= new HashMap<>();
+ Set<String> segmentsForDeletion = new HashSet<>();
+ for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
+ // check if segment is part of completed segments
+ if (!candidateSegmentsMap.containsKey(segmentName)) {
+ LOGGER.debug("Segment {} is not found in the candidate segments list,
skipping it for {}", segmentName,
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+ continue;
+ }
+ SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
+ for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
+ long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
+ long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
+
+ // Skip segments if the crc from zk metadata and server does not
match. They may be getting reloaded.
+ if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
+ LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={},
validDocIdsMetadata={})", segmentName,
+ segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
+ continue;
+ }
+
+ // segments eligible for deletion with no valid records
+ long totalDocs = validDocIdsMetadata.getTotalDocs();
+ if (totalInvalidDocs == totalDocs) {
+ segmentsForDeletion.add(segmentName);
+ } else if (alreadyMergedSegments.contains(segmentName)) {
+ LOGGER.debug("Segment {} already merged. Skipping it for {}",
segmentName,
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+ break;
+ } else {
+ Integer partitionID =
SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
+ if (partitionID == null) {
+ LOGGER.warn("Partition ID not found for segment: {}, skipping it
for {}", segmentName,
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
+ continue;
+ }
+ segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k ->
new ArrayList<>())
+ .add(new SegmentMergerMetadata(segment, totalValidDocs,
totalInvalidDocs));
+ }
+ break;
+ }
+ }
+
+ segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) ->
segmentList.sort(
+ Comparator.comparingLong(o ->
o.getSegmentZKMetadata().getCreationTime())));
+
+ // Map to store the result: each key (partition) will have a list of groups
+ Map<Integer, List<List<SegmentMergerMetadata>>> groupedSegments = new
HashMap<>();
+
+ // Iterate over each partition and process its segments list
+ for (Map.Entry<Integer, List<SegmentMergerMetadata>> entry :
segmentsEligibleForCompactMerge.entrySet()) {
+ int partitionID = entry.getKey();
+ List<SegmentMergerMetadata> segments = entry.getValue();
+ // task config thresholds
+ // TODO add output segment size as one of the thresholds
+ long validDocsThreshold = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
+ long maxRecordsPerTask = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
+ long maxNumSegments = Long.parseLong(
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
+
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
+
+ // List to store groups for the current partition
+ List<List<SegmentMergerMetadata>> groups = new ArrayList<>();
+ List<SegmentMergerMetadata> currentGroup = new ArrayList<>();
+
+ // variables to maintain current group sum
+ long currentValidDocsSum = 0;
+ long currentTotalDocsSum = 0;
+
+ for (SegmentMergerMetadata segment : segments) {
+ long validDocs = segment.getValidDocIds();
+ long invalidDocs = segment.getInvalidDocIds();
+
+ // Check if adding this segment would keep the validDocs sum within
the threshold
+ if (currentValidDocsSum + validDocs <= validDocsThreshold &&
currentGroup.size() < maxNumSegments
+ && currentTotalDocsSum + validDocs + invalidDocs <
maxRecordsPerTask) {
+ // Add the segment to the current group
+ currentGroup.add(segment);
+ currentValidDocsSum += validDocs;
+ currentTotalDocsSum += validDocs + invalidDocs;
+ } else {
+ // Finalize the current group and start a new one
+ if (!currentGroup.isEmpty()) {
+ groups.add(new ArrayList<>(currentGroup)); // Add the finalized
group
+ }
+
+ // Reset current group, sums and start with the new segment
+ currentGroup = new ArrayList<>();
+ currentGroup.add(segment);
+ currentValidDocsSum = validDocs;
+ currentTotalDocsSum = validDocs + invalidDocs;
+ }
+ }
+ // Add the last group
+ if (!currentGroup.isEmpty()) {
+ groups.add(new ArrayList<>(currentGroup)); // Add a copy of the
current group
+ }
+
+ // Sort groups by total invalidDocs in descending order, if invalidDocs
count are same, prefer group with
+ // higher number of small segments in them
+ // remove the groups having only 1 segments in them
+ // TODO this check can be later removed if we want single-segment
compaction from this task itself
+ List<List<SegmentMergerMetadata>> compactMergeGroups =
+ groups.stream().filter(x -> x.size() > 1).sorted((group1, group2) ->
{
+ long invalidDocsSum1 =
group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+ long invalidDocsSum2 =
group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+ if (invalidDocsSum2 < invalidDocsSum1) {
+ return -1;
+ } else if (invalidDocsSum2 == invalidDocsSum1) {
+ return Long.compare(group2.size(), group1.size());
+ } else {
+ return 1;
+ }
+ }).collect(Collectors.toList());
+
+ if (!compactMergeGroups.isEmpty()) {
+ groupedSegments.put(partitionID, compactMergeGroups);
+ }
+ }
+ return new SegmentSelectionResult(groupedSegments, new
ArrayList<>(segmentsForDeletion));
+ }
+
+ @VisibleForTesting
+ public static List<SegmentZKMetadata> getCandidateSegments(Map<String,
String> taskConfigs,
+ List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
+ List<SegmentZKMetadata> candidateSegments = new ArrayList<>();
+ String bufferPeriod =
+
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY,
DEFAULT_BUFFER_PERIOD);
+ long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+ for (SegmentZKMetadata segment : allSegments) {
+ // Skip segments if HDFS download url is empty. This also avoids any
race condition with deepstore upload
+ // retry task and this task
+ if (StringUtils.isBlank(segment.getDownloadUrl())) {
+ LOGGER.warn("Skipping segment {} for task as download url is empty",
segment.getSegmentName());
+ continue;
+ }
+ // initial segments selection based on status and age
+ if (segment.getStatus().isCompleted() && (segment.getEndTimeMs() <=
(currentTimeInMillis - bufferMs))) {
+ candidateSegments.add(segment);
+ }
+ }
+ return candidateSegments;
+ }
+
+ @VisibleForTesting
+ protected static Set<String>
getAlreadyMergedSegments(List<SegmentZKMetadata> allSegments) {
+ Set<String> alreadyMergedSegments = new HashSet<>();
+ for (SegmentZKMetadata segment : allSegments) {
+ // check if the segment has custom map having list of segments which
merged to form this. we will later
+ // filter out the merged segments as they will be deleted
+ if (segment.getCustomMap() != null && !segment.getCustomMap().isEmpty()
&& !StringUtils.isBlank(
+
segment.getCustomMap().get(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ +
MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX))) {
+
alreadyMergedSegments.addAll(List.of(StringUtils.split(segment.getCustomMap().get(
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ +
MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX), ",")));
+ }
+ }
+ return alreadyMergedSegments;
+ }
+
+ @Override
+ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String>
taskConfigs) {
+ // check table is realtime
+ Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+ String.format("%s only supports realtime tables!",
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+ // check upsert enabled
+ Preconditions.checkState(tableConfig.isUpsertEnabled(),
+ String.format("Upsert must be enabled for %s",
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+ // check no malformed period
+ if
(taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY))
{
+
TimeUtils.convertPeriodToMillis(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY));
+ }
+ // check enableSnapshot = true
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ Preconditions.checkNotNull(upsertConfig,
+ String.format("UpsertConfig must be provided for %s",
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+ Preconditions.checkState(upsertConfig.isEnableSnapshot(),
+ String.format("'enableSnapshot' from UpsertConfig must be enabled for
%s",
+ MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
+ }
+
+ @VisibleForTesting
+ protected String getDownloadUrl(List<SegmentMergerMetadata>
segmentMergerMetadataList) {
+ return StringUtils.join(segmentMergerMetadataList.stream().map(x ->
x.getSegmentZKMetadata().getDownloadUrl())
+ .collect(Collectors.toList()), ",");
+ }
+
+ @VisibleForTesting
+ protected String getSegmentCrcList(List<SegmentMergerMetadata>
segmentMergerMetadataList) {
+ return StringUtils.join(
+ segmentMergerMetadataList.stream().map(x ->
String.valueOf(x.getSegmentZKMetadata().getCrc()))
+ .collect(Collectors.toList()), ",");
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java
new file mode 100644
index 0000000000..1a717e88db
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+
+
+@EventObserverFactory
+public class UpsertCompactMergeTaskProgressObserverFactory extends
BaseMinionProgressObserverFactory {
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java
new file mode 100644
index 0000000000..9c5093d1ab
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+
+public class UpsertCompactMergeTaskExecutorTest {
+ private UpsertCompactMergeTaskExecutor _taskExecutor;
+
+ @BeforeClass
+ public void setUp() {
+ _taskExecutor = new UpsertCompactMergeTaskExecutor(null);
+ }
+
+ @Test
+ public void testValidateCRCForInputSegments() {
+ SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+ SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+
+ Mockito.when(segment1.getCrc()).thenReturn("1000");
+ Mockito.when(segment2.getCrc()).thenReturn("2000");
+
+ List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1,
segment2);
+ List<String> expectedCRCList = Arrays.asList("1000", "2000");
+
+ _taskExecutor.validateCRCForInputSegments(segmentMetadataList,
expectedCRCList);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testValidateCRCForInputSegmentsWithMismatchedCRC() {
+ SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+ SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+
+ Mockito.when(segment1.getCrc()).thenReturn("1000");
+ Mockito.when(segment2.getCrc()).thenReturn("3000");
+
+ List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1,
segment2);
+ List<String> expectedCRCList = Arrays.asList("1000", "2000");
+
+ _taskExecutor.validateCRCForInputSegments(segmentMetadataList,
expectedCRCList);
+ }
+
+ @Test
+ public void testGetCommonPartitionIDForSegments() {
+ SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+ SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+ SegmentMetadataImpl segment3 = Mockito.mock(SegmentMetadataImpl.class);
+
+ Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0");
+ Mockito.when(segment2.getName()).thenReturn("testTable__0__1__0");
+ Mockito.when(segment3.getName()).thenReturn("testTable__0__2__0");
+
+ List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1,
segment2, segment3);
+
+ int partitionID =
_taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList);
+ Assert.assertEquals(partitionID, 0);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testGetCommonPartitionIDForSegmentsWithDifferentPartitionIDs() {
+ SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class);
+ SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class);
+
+ Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0");
+ Mockito.when(segment2.getName()).thenReturn("testTable__1__0__0");
+
+ List<SegmentMetadataImpl> segmentMetadataList = Arrays.asList(segment1,
segment2);
+
+ _taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList);
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
new file mode 100644
index 0000000000..5556ac53cd
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UpsertCompactMergeTaskGeneratorTest {
+
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+ private UpsertCompactMergeTaskGenerator _taskGenerator;
+ private TableConfig _tableConfig;
+ private SegmentZKMetadata _completedSegment;
+ private SegmentZKMetadata _completedSegment2;
+ private Map<String, SegmentZKMetadata> _completedSegmentsMap;
+
+ @BeforeClass
+ public void setUp() {
+ _taskGenerator = new UpsertCompactMergeTaskGenerator();
+ Map<String, Map<String, String>> tableTaskConfigs = new HashMap<>();
+ Map<String, String> compactionConfigs = new HashMap<>();
+ tableTaskConfigs.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
compactionConfigs);
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableSnapshot(true);
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .setUpsertConfig(upsertConfig)
+ .setTaskConfig(new TableTaskConfig(tableTaskConfigs)).build();
+
+ _completedSegment = new SegmentZKMetadata("testTable__0");
+ _completedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ _completedSegment.setStartTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("12d"));
+ _completedSegment.setEndTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("11d"));
+ _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+ _completedSegment.setTotalDocs(100L);
+ _completedSegment.setCrc(1000);
+ _completedSegment.setDownloadUrl("fs://testTable__0");
+
+ _completedSegment2 = new SegmentZKMetadata("testTable__1");
+ _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ _completedSegment2.setStartTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("10d"));
+ _completedSegment2.setEndTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("9d"));
+ _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS);
+ _completedSegment2.setTotalDocs(10L);
+ _completedSegment2.setCrc(2000);
+ _completedSegment2.setDownloadUrl("fs://testTable__1");
+
+ _completedSegmentsMap = new HashMap<>();
+ _completedSegmentsMap.put(_completedSegment.getSegmentName(),
_completedSegment);
+ _completedSegmentsMap.put(_completedSegment2.getSegmentName(),
_completedSegment2);
+ }
+
+ @Test
+ public void testUpsertCompactMergeTaskConfig() {
+
+ // check with OFFLINE table
+ Map<String, String> upsertCompactMergeTaskConfig =
+ ImmutableMap.of("bufferTimePeriod", "5d");
+ TableConfig offlineTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+ new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+ upsertCompactMergeTaskConfig)))
+ .build();
+ Assert.assertThrows(IllegalStateException.class,
+ () -> _taskGenerator.validateTaskConfigs(offlineTableConfig,
upsertCompactMergeTaskConfig));
+
+ // check with non-upsert REALTIME table
+ TableConfig nonUpsertRealtimetableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setTaskConfig(new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+ upsertCompactMergeTaskConfig)))
+ .build();
+
+ Assert.assertThrows(IllegalStateException.class,
+ () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig,
upsertCompactMergeTaskConfig));
+
+ // check with snapshot disabled
+ TableConfig disabledSnapshotTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+ upsertCompactMergeTaskConfig)))
+ .build();
+ Assert.assertThrows(IllegalStateException.class,
+ () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig,
upsertCompactMergeTaskConfig));
+
+ // valid table configs
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableSnapshot(true);
+ TableConfig validTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setUpsertConfig(upsertConfig)
+ .setTaskConfig(new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+ upsertCompactMergeTaskConfig)))
+ .build();
+ _taskGenerator.validateTaskConfigs(validTableConfig,
upsertCompactMergeTaskConfig);
+
+ // invalid buffer time period
+ Map<String, String> upsertCompactMergeTaskConfig1 =
+ ImmutableMap.of("bufferTimePeriod", "5hd");
+ Assert.assertThrows(IllegalArgumentException.class,
+ () -> _taskGenerator.validateTaskConfigs(validTableConfig,
upsertCompactMergeTaskConfig1));
+ }
+
+ @Test
+ public void testGetAlreadyMergedSegments() {
+ SegmentZKMetadata mergedSegment = new
SegmentZKMetadata("testTable__merged");
+ mergedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ Map<String, String> customMap = new HashMap<>();
+ customMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE
+ + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX,
"testTable__0,testTable__1");
+ mergedSegment.setCustomMap(customMap);
+
+ // merged segment present
+ List<SegmentZKMetadata> allSegments = Arrays.asList(_completedSegment,
_completedSegment2, mergedSegment);
+ Set<String> alreadyMergedSegments =
UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments);
+ Assert.assertEquals(alreadyMergedSegments.size(), 2);
+ Assert.assertTrue(alreadyMergedSegments.contains("testTable__0"));
+ Assert.assertTrue(alreadyMergedSegments.contains("testTable__1"));
+
+ // no merging happened till now
+ List<SegmentZKMetadata> segments = Arrays.asList(_completedSegment,
_completedSegment2);
+ alreadyMergedSegments =
UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(segments);
+ Assert.assertTrue(alreadyMergedSegments.isEmpty());
+
+ // no segment present, empty list
+ alreadyMergedSegments =
UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(Collections.emptyList());
+ Assert.assertTrue(alreadyMergedSegments.isEmpty());
+ }
+
+ @Test
+ public void testGetCandidateSegments() {
+ Map<String, String> taskConfigs = new HashMap<>();
+
taskConfigs.put(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY,
"5d");
+
+ // candidates are valid, outside buffer period and download urls
+ List<SegmentZKMetadata> candidateSegments =
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+ new ArrayList<>(_completedSegmentsMap.values()),
System.currentTimeMillis());
+ Assert.assertEquals(candidateSegments.size(), 2);
+ Assert.assertTrue(candidateSegments.contains(_completedSegment));
+ Assert.assertTrue(candidateSegments.contains(_completedSegment2));
+
+ // candidate have empty download url
+ SegmentZKMetadata segmentWithNoDownloadUrl = new
SegmentZKMetadata("testTable__2");
+
segmentWithNoDownloadUrl.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentWithNoDownloadUrl.setStartTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("10d"));
+ segmentWithNoDownloadUrl.setEndTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("9d"));
+ segmentWithNoDownloadUrl.setTimeUnit(TimeUnit.MILLISECONDS);
+ segmentWithNoDownloadUrl.setTotalDocs(100L);
+ segmentWithNoDownloadUrl.setCrc(1000);
+ segmentWithNoDownloadUrl.setDownloadUrl("");
+ candidateSegments =
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+ List.of(segmentWithNoDownloadUrl), System.currentTimeMillis());
+ Assert.assertEquals(candidateSegments.size(), 0);
+
+ // candidates are within buffer period
+ SegmentZKMetadata segmentWithinBufferPeriod = new
SegmentZKMetadata("testTable__3");
+
segmentWithinBufferPeriod.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentWithinBufferPeriod.setStartTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("1d"));
+ segmentWithinBufferPeriod.setEndTime(System.currentTimeMillis());
+ segmentWithinBufferPeriod.setTimeUnit(TimeUnit.MILLISECONDS);
+ segmentWithinBufferPeriod.setTotalDocs(100L);
+ segmentWithinBufferPeriod.setCrc(1000);
+ segmentWithinBufferPeriod.setDownloadUrl("fs://testTable__3");
+ candidateSegments =
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+ List.of(segmentWithinBufferPeriod), System.currentTimeMillis());
+ Assert.assertEquals(candidateSegments.size(), 0);
+
+ // no completed segment
+ SegmentZKMetadata incompleteSegment = new
SegmentZKMetadata("testTable__4");
+
incompleteSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ incompleteSegment.setStartTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("1d"));
+ incompleteSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+ incompleteSegment.setTotalDocs(100L);
+ incompleteSegment.setCrc(1000);
+ candidateSegments =
UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs,
+ List.of(incompleteSegment), System.currentTimeMillis());
+ Assert.assertEquals(candidateSegments.size(), 0);
+ }
+
+ @Test
+ public void testGetDownloadUrl() {
+ // empty list
+ List<UpsertCompactMergeTaskGenerator.SegmentMergerMetadata>
segmentMergerMetadataList = Arrays.asList();
+
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
"");
+
+ // single segment
+ segmentMergerMetadataList =
+ List.of(new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10));
+
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
"fs://testTable__0");
+
+ // multiple segments
+ segmentMergerMetadataList = Arrays.asList(
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10),
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200,
20)
+ );
+
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
+ "fs://testTable__0,fs://testTable__1");
+ }
+
+ @Test
+ public void testGetSegmentCrcList() {
+ // empty list
+ List<UpsertCompactMergeTaskGenerator.SegmentMergerMetadata>
segmentMergerMetadataList = Arrays.asList();
+
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
"");
+
+ // single segment
+ segmentMergerMetadataList =
+ List.of(new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10));
+
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
"1000");
+
+ // multiple segments
+ segmentMergerMetadataList = Arrays.asList(
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100,
10),
+ new
UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200,
20)
+ );
+
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList),
"1000,2000");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]