This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 68260f9870 Add support for orphan segment cleanup (#15142)
68260f9870 is described below
commit 68260f98705e15a4005bdcb8ea4d454736aec652
Author: 9aman <[email protected]>
AuthorDate: Tue Mar 18 22:42:50 2025 +0530
Add support for orphan segment cleanup (#15142)
* Add support for orphan segment cleanup
* Add unit tests to the code
* Minor refactoring
* Fixing checkstyle violations
* Add support for realtime tables
* Fix test failures
* Adding metrics, improving logs for better debuggability
* Reducing code repetition
* Add support for providing match size for number of untracked segments to
be deleted in an single run of RetentionManager
* Add test cases to test batch sizes
* Provide additional comments for test class
* Fixing linting issues
* Adding a controller config to enable/ disable deletion of untracked
segments from deepstore during retention manager run
---
.../pinot/common/metrics/ControllerGauge.java | 6 +-
.../apache/pinot/controller/ControllerConf.java | 13 +
.../helix/core/SegmentDeletionManager.java | 4 +-
.../helix/core/retention/RetentionManager.java | 162 +++++++++-
.../core/retention/strategy/RetentionStrategy.java | 13 +-
.../retention/strategy/TimeRetentionStrategy.java | 14 +-
.../helix/core/retention/RetentionManagerTest.java | 348 ++++++++++++++++++++-
.../SegmentsValidationAndRetentionConfig.java | 10 +
8 files changed, 538 insertions(+), 32 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 40e2d09b4f..4777573604 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -190,7 +190,11 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress",
true),
// Resource utilization is within limits or not for a table
- RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded",
false);
+ RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded",
false),
+
+ // The number of segments in deepstore that do not have corresponding
metadata in ZooKeeper.
+ // These segments are untracked and should be considered for deletion based
on retention policies.
+ UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index d3396cd3a7..384371e2e8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -241,6 +241,10 @@ public class ControllerConf extends PinotConfiguration {
public static final String TMP_SEGMENT_RETENTION_IN_SECONDS =
"controller.realtime.segment.tmpFileRetentionInSeconds";
+ // Enables the deletion of untracked segments during the retention manager
run.
+ // Untracked segments are those that exist in deep store but have no
corresponding entry in the ZK property store.
+ public static final String ENABLE_UNTRACKED_SEGMENT_DELETION =
+ "controller.retentionManager.untrackedSegmentDeletionEnabled";
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND =
60 * 60; // 1 Hour.
@@ -1081,6 +1085,15 @@ public class ControllerConf extends PinotConfiguration {
ControllerPeriodicTasksConf.DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND);
}
+ public boolean getUntrackedSegmentDeletionEnabled() {
+ return
getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
false);
+ }
+
+ public void setUntrackedSegmentDeletionEnabled(boolean
untrackedSegmentDeletionEnabled) {
+ setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
untrackedSegmentDeletionEnabled);
+ }
+
+
public long getPinotTaskManagerInitialDelaySeconds() {
return getPeriodicTaskInitialDelayInSeconds();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index e5ab692fc9..c001042f9a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -222,7 +222,9 @@ public class SegmentDeletionManager {
URI segmentMetadataUri =
SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(),
segmentId);
if (pinotFS.exists(segmentMetadataUri)) {
LOGGER.info("Deleting segment metadata {} from {}", segmentId,
segmentMetadataUri);
- pinotFS.delete(segmentMetadataUri, true);
+ if (!pinotFS.delete(segmentMetadataUri, true)) {
+ LOGGER.warn("Could not delete segment metadata: {} from: {}",
segmentId, segmentMetadataUri);
+ }
}
} catch (IOException e) {
LOGGER.warn("Could not delete segment metadata {} from {}", segmentId,
segmentFileUri, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9bd8b9f62f..5b365d7768 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -18,18 +18,26 @@
*/
package org.apache.pinot.controller.helix.core.retention;
+import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.logging.log4j.util.Strings;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -39,6 +47,9 @@ import
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionSt
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -55,7 +66,9 @@ import org.slf4j.LoggerFactory;
*/
public class RetentionManager extends ControllerPeriodicTask<Void> {
public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(5L);
+ public static final int DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE = 100;
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
+ private final boolean _untrackedSegmentDeletionEnabled;
private static final Logger LOGGER =
LoggerFactory.getLogger(RetentionManager.class);
@@ -64,7 +77,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
super("RetentionManager",
config.getRetentionControllerFrequencyInSeconds(),
config.getRetentionManagerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
-
+ _untrackedSegmentDeletionEnabled =
config.getUntrackedSegmentDeletionEnabled();
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}",
getIntervalInSeconds());
}
@@ -105,6 +118,10 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ int untrackedSegmentsDeletionBatchSize =
+ validationConfig.getUntrackedSegmentsDeletionBatchSize() != null ?
Integer.parseInt(
+ validationConfig.getUntrackedSegmentsDeletionBatchSize()) :
DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;
+
RetentionStrategy retentionStrategy;
try {
retentionStrategy = new
TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
@@ -117,15 +134,23 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
// Scan all segment ZK metadata and purge segments if necessary
if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
+ manageRetentionForOfflineTable(tableNameWithType, retentionStrategy,
untrackedSegmentsDeletionBatchSize);
} else {
- manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
+ manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy,
untrackedSegmentsDeletionBatchSize);
}
}
- private void manageRetentionForOfflineTable(String offlineTableName,
RetentionStrategy retentionStrategy) {
- List<String> segmentsToDelete = new ArrayList<>();
- for (SegmentZKMetadata segmentZKMetadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+ private void manageRetentionForOfflineTable(String offlineTableName,
RetentionStrategy retentionStrategy,
+ int untrackedSegmentsDeletionBatchSize) {
+ List<SegmentZKMetadata> segmentZKMetadataList =
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+
+ // fetch those segments that are beyond the retention period and don't
have an entry in ZK i.e.
+ // SegmentZkMetadata is missing for those segments
+ List<String> segmentsToDelete =
+ getSegmentsToDeleteFromDeepstore(offlineTableName, retentionStrategy,
segmentZKMetadataList,
+ untrackedSegmentsDeletionBatchSize);
+
+ for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (retentionStrategy.isPurgeable(offlineTableName, segmentZKMetadata)) {
segmentsToDelete.add(segmentZKMetadata.getSegmentName());
}
@@ -136,11 +161,20 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
}
- private void manageRetentionForRealtimeTable(String realtimeTableName,
RetentionStrategy retentionStrategy) {
- List<String> segmentsToDelete = new ArrayList<>();
+ private void manageRetentionForRealtimeTable(String realtimeTableName,
RetentionStrategy retentionStrategy,
+ int untrackedSegmentsDeletionBatchSize) {
+ List<SegmentZKMetadata> segmentZKMetadataList =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+
+ // fetch those segments that are beyond the retention period and don't
have an entry in ZK i.e.
+ // SegmentZkMetadata is missing for those segments
+ List<String> segmentsToDelete =
+ getSegmentsToDeleteFromDeepstore(realtimeTableName, retentionStrategy,
segmentZKMetadataList,
+ untrackedSegmentsDeletionBatchSize);
+
IdealState idealState = _pinotHelixResourceManager.getHelixAdmin()
.getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(),
realtimeTableName);
- for (SegmentZKMetadata segmentZKMetadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) {
+
+ for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
String segmentName = segmentZKMetadata.getSegmentName();
if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
// Delete old LLC segment that hangs around. Do not delete segment
that are current since there may be a race
@@ -189,6 +223,116 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
}
+ private List<String> getSegmentsToDeleteFromDeepstore(String
tableNameWithType, RetentionStrategy retentionStrategy,
+ List<SegmentZKMetadata> segmentZKMetadataList, int
untrackedSegmentsDeletionBatchSize) {
+ List<String> segmentsToDelete = new ArrayList<>();
+
+ if (!_untrackedSegmentDeletionEnabled) {
+ LOGGER.info(
+ "Not scanning deep store for untracked segments for table: {}",
tableNameWithType);
+ return segmentsToDelete;
+ }
+
+ if (untrackedSegmentsDeletionBatchSize <= 0) {
+ // return an empty list in case untracked segment deletion batch size is
configured < 0 in table config
+ LOGGER.info(
+ "Not scanning deep store for untracked segments for table: {} as
untrackedSegmentsDeletionBatchSize is set "
+ + "to: {}",
+ tableNameWithType, untrackedSegmentsDeletionBatchSize);
+ return segmentsToDelete;
+ }
+
+ List<String> segmentsPresentInZK =
+
segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
+ try {
+ LOGGER.info("Fetch segments present in deep store that are beyond
retention period for table: {}",
+ tableNameWithType);
+ segmentsToDelete =
+ findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType,
retentionStrategy, segmentsPresentInZK);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.UNTRACKED_SEGMENTS_COUNT,
+ segmentsToDelete.size());
+
+ if (segmentsToDelete.size() > untrackedSegmentsDeletionBatchSize) {
+ LOGGER.info("Truncating segments to delete from {} to {} for table:
{}",
+ segmentsToDelete.size(), untrackedSegmentsDeletionBatchSize,
tableNameWithType);
+ segmentsToDelete = segmentsToDelete.subList(0,
untrackedSegmentsDeletionBatchSize);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Unable to fetch segments from deep store that are beyond
retention period for table: {}",
+ tableNameWithType);
+ }
+
+ return segmentsToDelete;
+ }
+
+
+ /**
+ * Identifies segments in deepstore that are ready for deletion based on the
retention strategy.
+ *
+ * This method finds segments that are beyond the retention period and are
ready to be purged.
+ * It only considers segments that do not have entries in ZooKeeper metadata
i.e. untracked segments.
+ * The lastModified time of the file in deepstore is used to determine
whether the segment
+ * should be retained or purged.
+ *
+ * @param tableNameWithType Name of the offline table
+ * @param retentionStrategy Strategy to determine if a segment should be
purged
+ * @param segmentsToExclude List of segment names that should be excluded
from deletion
+ * @return List of segment names that should be deleted from deepstore
+ * @throws IOException If there's an error accessing the filesystem
+ */
+ private List<String> findUntrackedSegmentsToDeleteFromDeepstore(String
tableNameWithType,
+ RetentionStrategy retentionStrategy, List<String> segmentsToExclude)
+ throws IOException {
+
+ List<String> segmentsToDelete = new ArrayList<>();
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ URI tableDataUri =
URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName);
+ PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme());
+
+ long startTimeMs = System.currentTimeMillis();
+
+ List<FileMetadata> deepstoreFiles =
pinotFS.listFilesWithMetadata(tableDataUri, false);
+ long listEndTimeMs = System.currentTimeMillis();
+ LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to
list segments: {} ms",
+ deepstoreFiles.size(), tableNameWithType, listEndTimeMs - startTimeMs);
+
+ for (FileMetadata fileMetadata : deepstoreFiles) {
+ if (fileMetadata.isDirectory()) {
+ continue;
+ }
+
+ String segmentName = extractSegmentName(fileMetadata.getFilePath());
+ if (Strings.isEmpty(segmentName) ||
segmentsToExclude.contains(segmentName)) {
+ continue;
+ }
+
+ // determine whether the segment should be purged or not based on the
last modified time of the file
+ long lastModifiedTime = fileMetadata.getLastModifiedTime();
+
+ if (retentionStrategy.isPurgeable(tableNameWithType, segmentName,
lastModifiedTime)) {
+ segmentsToDelete.add(segmentName);
+ }
+ }
+ long endTimeMs = System.currentTimeMillis();
+ LOGGER.info(
+ "Took: {} ms to identify {} segments for deletion from deep store for
table: {} as they have no corresponding"
+ + " entry in the property store.",
+ endTimeMs - startTimeMs, segmentsToDelete.size(), tableNameWithType);
+ return segmentsToDelete;
+ }
+
+ @Nullable
+ private String extractSegmentName(@Nullable String filePath) {
+ if (Strings.isEmpty(filePath)) {
+ return null;
+ }
+ String segmentName = filePath.substring(filePath.lastIndexOf("/") + 1);
+ if (segmentName.endsWith(TarCompressionUtils.TAR_GZ_FILE_EXTENSION)) {
+ segmentName = segmentName.substring(0, segmentName.length() -
TarCompressionUtils.TAR_GZ_FILE_EXTENSION.length());
+ }
+ return segmentName;
+ }
+
private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
String tableNameWithType = tableConfig.getTableName();
List<String> segmentsToDelete = new ArrayList<>();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
index e8f6336961..8e31cce37c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
@@ -34,4 +34,15 @@ public interface RetentionStrategy {
* @return Whether the segment should be purged
*/
boolean isPurgeable(String tableNameWithType, SegmentZKMetadata
segmentZKMetadata);
-}
+
+ /**
+ * Determines whether a segment is eligible for purging
+ *
+ * @param tableNameWithType The table name, including its type.
+ * @param segmentName The name of the segment to evaluate.
+ * @param segmentTimeMs The segment's timestamp in milliseconds, which could
be the end time from ZK metadata or
+ * the modification time (mTime) for the file in deep
store etc.
+ * @return {@code true} if the segment should be purged; {@code false}
otherwise.
+ */
+ boolean isPurgeable(String tableNameWithType, String segmentName, long
segmentTimeMs);
+ }
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index b98fe5b534..cda94dbb5d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,15 +39,19 @@ public class TimeRetentionStrategy implements
RetentionStrategy {
@Override
public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata
segmentZKMetadata) {
- long endTimeMs = segmentZKMetadata.getEndTimeMs();
+ return isPurgeable(tableNameWithType, segmentZKMetadata.getSegmentName(),
segmentZKMetadata.getEndTimeMs());
+ }
+
+ @Override
+ public boolean isPurgeable(String tableNameWithType, String segmentName,
long segmentTimeMs) {
// Check that the end time is between 1971 and 2071
- if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
- LOGGER.warn("Segment: {} of table: {} has invalid end time in millis:
{}", segmentZKMetadata.getSegmentName(),
- tableNameWithType, endTimeMs);
+ if (!TimeUtils.timeValueInValidRange(segmentTimeMs)) {
+ LOGGER.warn("Segment: {} of table: {} has invalid end time in millis:
{}", segmentName,
+ tableNameWithType, segmentTimeMs);
return false;
}
- return System.currentTimeMillis() - endTimeMs > _retentionMs;
+ return System.currentTimeMillis() - segmentTimeMs > _retentionMs;
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index b3e656de9e..f4a71c55f1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -18,10 +18,16 @@
*/
package org.apache.pinot.controller.helix.core.retention;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -42,12 +48,16 @@ import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static
org.apache.pinot.controller.helix.core.retention.RetentionManager.DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -57,7 +67,31 @@ public class RetentionManagerTest {
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME);
- private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit,
long dayAfterTomorrowTimeStamp) {
+ // Variables for real file test
+ private Path _tempDir;
+ private File _tableDir;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ // Setup for real file test
+ _tempDir = Files.createTempDirectory("pinot-retention-test");
+ _tableDir = new File(_tempDir.toFile(), TEST_TABLE_NAME);
+ _tableDir.mkdirs();
+
+ final long pastMillisSinceEpoch = 1343001600000L;
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ // Clean up the temporary directory after each test
+ if (_tempDir != null) {
+ FileUtils.deleteDirectory(_tempDir.toFile());
+ }
+ }
+
+ private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit,
long dayAfterTomorrowTimeStamp,
+ String untrackedSegmentsDeletionBatchSize, int
untrackedSegmentsInDeepstoreSize) {
List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
// Create metadata for 10 segments really old, that will be removed by the
retention manager.
final int numOlderSegments = 10;
@@ -73,19 +107,77 @@ public class RetentionManagerTest {
mockSegmentZKMetadata(dayAfterTomorrowTimeStamp,
dayAfterTomorrowTimeStamp, timeUnit);
segmentsZKMetadata.add(segmentZKMetadata);
}
+
+ // Create actual segment files with specific modification times
+ // 1. A file that should be kept (in ZK metadata)
+ File segment1File = new File(_tableDir,
segmentsZKMetadata.get(0).getSegmentName());
+ createFileWithContent(segment1File, "segment1 data");
+ setFileModificationTime(segment1File, timeUnit.toMillis(pastTimeStamp));
+
+ // 2. A file that should be kept (in ZK metadata)
+ File segment2File = new File(_tableDir,
segmentsZKMetadata.get(10).getSegmentName());
+ createFileWithContent(segment2File, "segment2 data");
+ setFileModificationTime(segment2File, timeUnit.toMillis(pastTimeStamp));
+
+ // 3. A file that should not be deleted (not in ZK metadata but recent)
+ File segment3File = new File(_tableDir, "segment3.tar.gz");
+ createFileWithContent(segment3File, "segment3 data");
+ setFileModificationTime(segment3File,
timeUnit.toMillis(dayAfterTomorrowTimeStamp));
+
+ int deletionBatchSize = untrackedSegmentsDeletionBatchSize == null ?
DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE
+ : Integer.parseInt(untrackedSegmentsDeletionBatchSize);
+
+ // Create additional untracked segment files to test batch size limit
+ if (untrackedSegmentsInDeepstoreSize > 0) {
+ // Create more untracked segments
+ for (int i = 0; i < untrackedSegmentsInDeepstoreSize; i++) {
+ String segmentName = "extraSegment" + i;
+ File segmentFile = new File(_tableDir, segmentName);
+ createFileWithContent(segmentFile, "extra segment " + i + " data");
+ setFileModificationTime(segmentFile, timeUnit.toMillis(pastTimeStamp));
+ if (i < deletionBatchSize) {
+ // Add segments to the removed list till we reach
untrackedSegmentsDeletionBatchSize
+ removedSegments.add(segmentName);
+ }
+ }
+ }
+
final TableConfig tableConfig = createOfflineTableConfig();
+ // Set untrackedSegmentsDeletionBatchSize if not null
+ if (untrackedSegmentsDeletionBatchSize != null) {
+
tableConfig.getValidationConfig().setUntrackedSegmentsDeletionBatchSize(untrackedSegmentsDeletionBatchSize);
+ }
+
LeadControllerManager leadControllerManager =
mock(LeadControllerManager.class);
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
- setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager, leadControllerManager);
+
+ // Use appropriate setup based on test case
+ // In case of untrackedSegmentsDeletionBatchSize <
untrackedSegmentsInDeepstoreSize, we cannot guarantee which
+ // files/ segments will be picked for deletion as there is not ordering/
sorting done before selecting
+ // untrackedSegmentsDeletionBatchSize out of
untrackedSegmentsInDeepstoreSize to delete.
+ // For the case untrackedSegmentsDeletionBatchSize <
untrackedSegmentsInDeepstoreSize we just check the size of the
+ // segments that will get deleted.
+ // if the untrackedSegmentsDeletionBatchSize all the segments will be
deleted as the batch size by default is 100
+ if (deletionBatchSize >= untrackedSegmentsInDeepstoreSize) {
+ // Use original setup for the case when all the segments will be included
+ setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager, leadControllerManager);
+ } else {
+ // Use batch size specific setup
+ setupPinotHelixResourceManagerForBatchSize(tableConfig, numOlderSegments,
+ deletionBatchSize, segmentsZKMetadata,
+ pinotHelixResourceManager, leadControllerManager);
+ }
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata);
+
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
ControllerConf conf = new ControllerConf();
ControllerMetrics controllerMetrics = new
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
+ conf.setUntrackedSegmentDeletionEnabled(true);
RetentionManager retentionManager =
new RetentionManager(pinotHelixResourceManager, leadControllerManager,
conf, controllerMetrics);
retentionManager.start();
@@ -93,52 +185,212 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
- // Verify that the removeAgedDeletedSegments() method in deletion manager
is actually called.
+ // Verify that the removeAgedDeletedSegments() method in deletion manager
is called
verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager);
- // Verify that the deleteSegments method is actually called.
- verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(),
anyList());
+ // Verify deleteSegments is called
+ verify(pinotHelixResourceManager,
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
}
@Test
- public void testRetentionWithMinutes() {
+ public void testRetentionWithMinutesNoBatchSizeAndSegmentsInDeepStore() {
final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60;
final long pastMinutesSinceEpoch = 22383360L;
- testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES,
minutesSinceEpochTimeStamp);
+ testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES,
minutesSinceEpochTimeStamp, null, 4);
+ }
+
+ @Test
+ public void testRetentionWithMinutesNoBatchSizeAndMoreSegmentsInDeepStore() {
+ // For this test the default batch size will get picked
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60;
+ final long pastMinutesSinceEpoch = 22383360L;
+ testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES,
minutesSinceEpochTimeStamp, null, 105);
+ }
+
+
+ @Test
+ public void
testRetentionWithMinutesWithBatchSizeAndLessSegmentsInDeepStore() {
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60;
+ final long pastMinutesSinceEpoch = 22383360L;
+ testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES,
minutesSinceEpochTimeStamp, "5", 3);
+ }
+
+ @Test
+ public void
testRetentionWithMinutesWithBatchSizeAndMoreSegmentsInDeepStore() {
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60;
+ final long pastMinutesSinceEpoch = 22383360L;
+ testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES,
minutesSinceEpochTimeStamp, "5", 10);
+ }
+
+
+ @Test
+ public void testRetentionWithSecondsNoBatchSizeAndSegmentsInDeepStore() {
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60 * 60;
+ final long pastSecondsSinceEpoch = 1343001600L;
+ testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS,
secondsSinceEpochTimeStamp, null, 4);
+ }
+
+ @Test
+ public void
testRetentionWithSecondsWithBatchSizeAndLessSegmentsInDeepStore() {
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60 * 60;
+ final long pastSecondsSinceEpoch = 1343001600L;
+ testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS,
secondsSinceEpochTimeStamp, "5", 3);
}
@Test
- public void testRetentionWithSeconds() {
+ public void
testRetentionWithSecondsWithBatchSizeAndMoreSegmentsInDeepStore() {
final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60 * 60;
final long pastSecondsSinceEpoch = 1343001600L;
- testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS,
secondsSinceEpochTimeStamp);
+ testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS,
secondsSinceEpochTimeStamp, "5", 10);
}
@Test
- public void testRetentionWithMillis() {
+ public void testRetentionWithMillisNoBatchSizeAndSegmentsInDeepStore() {
final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60 * 60 * 1000;
final long pastMillisSinceEpoch = 1343001600000L;
- testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS,
millisSinceEpochTimeStamp);
+ testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS,
millisSinceEpochTimeStamp, null, 4);
}
@Test
- public void testRetentionWithHours() {
+ public void testRetentionWithMillisWithBatchSizeAndLessSegmentsInDeepStore()
{
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60 * 60 * 1000;
+ final long pastMillisSinceEpoch = 1343001600000L;
+ testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS,
millisSinceEpochTimeStamp, "5", 3);
+ }
+
+ @Test
+ public void testRetentionWithMillisWithBatchSizeAndMoreSegmentsInDeepStore()
{
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24
* 60 * 60 * 1000;
+ final long pastMillisSinceEpoch = 1343001600000L;
+ testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS,
millisSinceEpochTimeStamp, "5", 10);
+ }
+
+ @Test
+ public void testRetentionWithHoursNoBatchSizeAndSegmentsInDeepStore() {
final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
final long pastHoursSinceEpoch = 373056L;
- testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS,
hoursSinceEpochTimeStamp);
+ testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS,
hoursSinceEpochTimeStamp, null, 4);
}
@Test
- public void testRetentionWithDays() {
+ public void testRetentionWithHoursWithBatchSizeAndLessSegmentsInDeepStore() {
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
+ final long pastHoursSinceEpoch = 373056L;
+ testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS,
hoursSinceEpochTimeStamp, "5", 3);
+ }
+
+ @Test
+ public void testRetentionWithHoursWithBatchSizeAndMoreSegmentsInDeepStore() {
+ final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() /
1000 / 60 / 60 / 24 + 2;
+ final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
+ final long pastHoursSinceEpoch = 373056L;
+ testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS,
hoursSinceEpochTimeStamp, "5", 10);
+ }
+
+
+ @Test
+ public void testRetentionWithDaysNoBatchSizeAndSegmentsInDeepStore() {
+ final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 /
60 / 60 / 24 + 2;
+ final long pastDaysSinceEpoch = 15544L;
+ testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS,
daysSinceEpochTimeStamp, null, 4);
+ }
+
+ @Test
+ public void testRetentionWithDaysWithBatchSizeAndLessSegmentsInDeepStore() {
final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 /
60 / 60 / 24 + 2;
final long pastDaysSinceEpoch = 15544L;
- testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS,
daysSinceEpochTimeStamp);
+ testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS,
daysSinceEpochTimeStamp, "5", 3);
}
+ @Test
+ public void testRetentionWithDaysWithBatchSizeAndMoreSegmentsInDeepStore() {
+ final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 /
60 / 60 / 24 + 2;
+ final long pastDaysSinceEpoch = 15544L;
+ testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS,
daysSinceEpochTimeStamp, "5", 10);
+ }
+
+ @Test
+ public void testOffByDefaultForUntrackedSegmentsDeletion() {
+ long pastTimeStamp = 15544L;
+ TimeUnit timeUnit = TimeUnit.DAYS;
+ long dayAfterTomorrowTimeStamp = System.currentTimeMillis() / 1000 / 60 /
60 / 24 + 2;
+
+ List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+ // Create metadata for 10 segments really old, that will be removed by the
retention manager.
+ final int numOlderSegments = 10;
+ List<String> removedSegments = new ArrayList<>();
+ for (int i = 0; i < numOlderSegments; i++) {
+ SegmentZKMetadata segmentZKMetadata =
mockSegmentZKMetadata(pastTimeStamp, pastTimeStamp, timeUnit);
+ segmentsZKMetadata.add(segmentZKMetadata);
+ removedSegments.add(segmentZKMetadata.getSegmentName());
+ }
+ // Create metadata for 5 segments that will not be removed.
+ for (int i = 0; i < 5; i++) {
+ SegmentZKMetadata segmentZKMetadata =
+ mockSegmentZKMetadata(dayAfterTomorrowTimeStamp,
dayAfterTomorrowTimeStamp, timeUnit);
+ segmentsZKMetadata.add(segmentZKMetadata);
+ }
+
+ // Create actual segment files with specific modification times
+ // 1. A file that should be kept (in ZK metadata)
+ File segment1File = new File(_tableDir,
segmentsZKMetadata.get(0).getSegmentName());
+ createFileWithContent(segment1File, "segment1 data");
+ setFileModificationTime(segment1File, timeUnit.toMillis(pastTimeStamp));
+
+ // 2. A file that should be kept (in ZK metadata)
+ File segment2File = new File(_tableDir,
segmentsZKMetadata.get(10).getSegmentName());
+ createFileWithContent(segment2File, "segment2 data");
+ setFileModificationTime(segment2File, timeUnit.toMillis(pastTimeStamp));
+
+ // 3. A file that should not be deleted as the deletion of untracked
segments is off by default
+ File segment3File = new File(_tableDir, "segment3.tar.gz");
+ createFileWithContent(segment3File, "segment3 data");
+ setFileModificationTime(segment3File, timeUnit.toMillis(pastTimeStamp));
+
+ final TableConfig tableConfig = createOfflineTableConfig();
+
+ LeadControllerManager leadControllerManager =
mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
+
+ setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager, leadControllerManager);
+
+
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
+
when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata);
+
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
+
+ ControllerConf conf = new ControllerConf();
+ ControllerMetrics controllerMetrics = new
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ conf.setRetentionControllerFrequencyInSeconds(0);
+ conf.setDeletedSegmentsRetentionInDays(0);
+
+ RetentionManager retentionManager =
+ new RetentionManager(pinotHelixResourceManager, leadControllerManager,
conf, controllerMetrics);
+ retentionManager.start();
+ retentionManager.run();
+
+ SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
+
+ // Verify that the removeAgedDeletedSegments() method in deletion manager
is called
+ verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager);
+
+ // Verify deleteSegments is called
+ verify(pinotHelixResourceManager,
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
+ }
+
+
private TableConfig createOfflineTableConfig() {
return new
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setRetentionTimeUnit("DAYS")
.setRetentionTimeValue("365").setNumReplicas(2).build();
@@ -179,6 +431,47 @@ public class RetentionManagerTest {
}).when(resourceManager).deleteSegments(anyString(), anyList());
}
+ private void setupPinotHelixResourceManagerForBatchSize(TableConfig
tableConfig, int numOlderSegments,
+ int untrackedSegmentsDeletionBatchSize, List<SegmentZKMetadata>
segmentsZKMetadata,
+ PinotHelixResourceManager resourceManager, LeadControllerManager
leadControllerManager) {
+
+ String tableNameWithType = tableConfig.getTableName();
+
when(resourceManager.getAllTables()).thenReturn(List.of(tableNameWithType));
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ SegmentDeletionManager deletionManager =
mock(SegmentDeletionManager.class);
+ doAnswer(invocationOnMock ->
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
+
+ // Set up verification for deleteSegments with focus on the count and
segment inclusion rules
+ doAnswer(invocationOnMock -> {
+ Object[] args = invocationOnMock.getArguments();
+ String tableNameArg = (String) args[0];
+ assertEquals(tableNameArg, tableNameWithType);
+ List<String> segmentListArg = (List<String>) args[1];
+
+ // Verify all the old metadata segments are included
+ for (int i = 0; i < numOlderSegments; i++) {
+
assertTrue(segmentListArg.contains(segmentsZKMetadata.get(i).getSegmentName()));
+ }
+
+ // Verify segment3 (recent untracked segment) is NOT included
+ assertFalse(segmentListArg.contains("segment3.tar.gz"));
+
+ // Calculate expected total segments that should be deleted
+ // ZK metadata segments + untracked segments up to the batch size limit
+ int expectedTotalSegments = numOlderSegments +
untrackedSegmentsDeletionBatchSize;
+
+ // Verify the total count is as expected
+ assertEquals(expectedTotalSegments, segmentListArg.size());
+
+ return null;
+ }).when(resourceManager).deleteSegments(anyString(), anyList());
+ }
+
+
// This test makes sure that we clean up the segments marked OFFLINE in
realtime for more than 7 days
@Test
public void testRealtimeLLCCleanup() {
@@ -194,6 +487,7 @@ public class RetentionManagerTest {
PinotHelixResourceManager pinotHelixResourceManager =
setupSegmentMetadata(tableConfig, now, initialNumSegments,
removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager, leadControllerManager);
+
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
ControllerConf conf = new ControllerConf();
ControllerMetrics controllerMetrics = new
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
@@ -226,6 +520,7 @@ public class RetentionManagerTest {
PinotHelixResourceManager pinotHelixResourceManager =
setupSegmentMetadataForPausedTable(tableConfig, now, removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager, leadControllerManager);
+
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
ControllerConf conf = new ControllerConf();
ControllerMetrics controllerMetrics = new
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
@@ -372,4 +667,27 @@ public class RetentionManagerTest {
when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime));
return segmentZKMetadata;
}
+
+ /**
+ * Helper method to create a file with content
+ */
+ private void createFileWithContent(File file, String content) {
+ try {
+ Files.write(file.toPath(), content.getBytes());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper method to set file modification time
+ */
+ private void setFileModificationTime(File file, long timestamp) {
+ FileTime fileTime = FileTime.fromMillis(timestamp);
+ try {
+ Files.setLastModifiedTime(file.toPath(), fileTime);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 592a6c1960..2bfdc051a6 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -55,6 +55,8 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
// For more usage of this field, please refer to this design doc:
https://tinyurl.com/f63ru4sb
private String _peerSegmentDownloadScheme;
+ private String _untrackedSegmentsDeletionBatchSize;
+
/**
* @deprecated Use {@link InstanceAssignmentConfig} instead
*/
@@ -250,4 +252,12 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
public void setMinimizeDataMovement(boolean minimizeDataMovement) {
_minimizeDataMovement = minimizeDataMovement;
}
+
+ public String getUntrackedSegmentsDeletionBatchSize() {
+ return _untrackedSegmentsDeletionBatchSize;
+ }
+
+ public void setUntrackedSegmentsDeletionBatchSize(String
untrackedSegmentsDeletionBatchSize) {
+ _untrackedSegmentsDeletionBatchSize = untrackedSegmentsDeletionBatchSize;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]