This is an automated email from the ASF dual-hosted git repository.
swaminathanmanish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0ce2b21b730 Refactor deep store delete code (#18678)
0ce2b21b730 is described below
commit 0ce2b21b7307c726ffde9eada73ef4bb386dc378
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed Jun 10 16:55:19 2026 +0530
Refactor deep store delete code (#18678)
* Refactor deep store delete code
* Add RetentionManager factory hook and FakePropertyStore bulk-read support
* Factory for Retention Manager and Segment Deletion Manager in their tests
* Use LinkedHashSet to keep deep-store deletion order deterministic
deleteSegmentsFromPropertyStore returned a HashSet that was iterated by
removeSegmentsFromStoreInBatch, making the deep-store deletion order depend
on hash bucket layout. Switch to LinkedHashSet to preserve the input order.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../common/utils/helix/FakePropertyStore.java | 41 +++++++
.../pinot/controller/BaseControllerStarter.java | 14 ++-
.../helix/core/PinotHelixResourceManager.java | 8 +-
.../helix/core/SegmentDeletionManager.java | 133 +++++++++++++--------
.../helix/core/retention/RetentionManager.java | 23 ++--
.../helix/core/retention/RetentionManagerTest.java | 23 ++--
.../core/util/SegmentDeletionManagerTest.java | 13 +-
7 files changed, 178 insertions(+), 77 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
index 2c5806aab92..6e146504f85 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils.helix;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,6 +58,46 @@ public class FakePropertyStore extends
ZkHelixPropertyStore<ZNRecord> {
.collect(Collectors.toList());
}
+ /**
+ * Bulk-read variant that returns the stored {@link ZNRecord} for each
immediate child of
+ * {@code parentPath}. Populates {@code stats} with the per-record stat in
the same order as the
+ * returned list. The real Helix implementation goes through {@code
_baseAccessor}, which is
+ * {@code null} in this fake; this override backs the same read shape from
the in-memory map so
+ * tests that rely on bulk reads work end-to-end.
+ */
+ @Override
+ public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int
options) {
+ List<String> childNames = getChildNames(parentPath, options);
+ List<ZNRecord> out = new ArrayList<>(childNames.size());
+ if (stats != null) {
+ stats.clear();
+ }
+ for (String childName : childNames) {
+ String childPath = parentPath + "/" + childName;
+ ZNRecord record = _contents.get(childPath);
+ if (record == null) {
+ continue;
+ }
+ out.add(record);
+ if (stats != null) {
+ Stat stat = _statMap.get(childPath);
+ stats.add(stat != null ? stat : new Stat());
+ }
+ }
+ return out;
+ }
+
+ /**
+ * Five-arg getChildren overload used by Pinot via {@code
CommonConstants.Helix.ZkClient.RETRY_*}.
+ * The retry parameters don't apply to the in-memory fake; delegates to the
simpler signature
+ * above.
+ */
+ @Override
+ public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int
options, int retryCount,
+ int retryInterval) {
+ return getChildren(parentPath, stats, options);
+ }
+
@Override
public boolean exists(String path, int options) {
return _contents.containsKey(path);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index c0a64c5fc5f..a7ef8f42662 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -1077,8 +1077,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
initRealtimeOffsetAutoResetManager(periodicTasks);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(_helixResourceManager, _config,
_executorService, _connectionManager);
- _retentionManager = new RetentionManager(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- brokerServiceHelper);
+ _retentionManager = createRetentionManager(brokerServiceHelper);
periodicTasks.add(_retentionManager);
_offlineSegmentValidationManager =
new OfflineSegmentValidationManager(_config, _helixResourceManager,
_leadControllerManager,
@@ -1136,6 +1135,17 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
periodicTasks.add(_realtimeOffsetAutoResetManager);
}
+ /**
+ * Factory hook for the controller's {@link RetentionManager}. Subclasses
override to install a
+ * deployment-specific retention manager (e.g. to extend the
untracked-segment sweep with names
+ * tracked outside the standard per-segment ZK znodes). The default
constructs the stock
+ * {@link RetentionManager}.
+ */
+ protected RetentionManager createRetentionManager(BrokerServiceHelper
brokerServiceHelper) {
+ return new RetentionManager(_helixResourceManager, _leadControllerManager,
_config, _controllerMetrics,
+ brokerServiceHelper);
+ }
+
/**
* Creates a TaskManager instance as specified in the configuration.
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 24d779e2507..4013379f722 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -318,7 +318,7 @@ public class PinotHelixResourceManager {
_helixDataAccessor = _helixZkManager.getHelixDataAccessor();
_keyBuilder = _helixDataAccessor.keyBuilder();
_controllerMetrics = controllerMetrics;
- _segmentDeletionManager = new SegmentDeletionManager(_dataDir,
_helixAdmin, _helixClusterName, _propertyStore,
+ _segmentDeletionManager = createSegmentDeletionManager(_dataDir,
_helixAdmin, _helixClusterName, _propertyStore,
_deletedSegmentsRetentionInDays);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore,
_isSingleTenantCluster);
@@ -404,6 +404,12 @@ public class PinotHelixResourceManager {
return _segmentDeletionManager;
}
+ protected SegmentDeletionManager createSegmentDeletionManager(String
dataDir, HelixAdmin helixAdmin,
+ String helixClusterName, ZkHelixPropertyStore<ZNRecord> propertyStore,
int deletedSegmentsRetentionInDays) {
+ return new SegmentDeletionManager(dataDir, helixAdmin, helixClusterName,
propertyStore,
+ deletedSegmentsRetentionInDays);
+ }
+
/**
* Get the Helix manager.
*
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 3d5680fae37..7d4944897c7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -124,7 +125,7 @@ public class SegmentDeletionManager {
_materializedViewConsistencyManager = materializedViewConsistencyManager;
}
- private void notifyMaterializedViewConsistencyManager(String tableName,
List<String> segmentsToDelete) {
+ protected void notifyMaterializedViewConsistencyManager(String tableName,
List<String> segmentsToDelete) {
MaterializedViewConsistencyManager mgr =
_materializedViewConsistencyManager;
if (mgr == null || segmentsToDelete.isEmpty()) {
return;
@@ -187,17 +188,51 @@ public class SegmentDeletionManager {
protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String
tableName, Collection<String> segmentIds,
Long deletedSegmentsRetentionMs, long deletionDelay) {
- // Check if segment got removed from ExternalView or IdealState
+ List<String> segmentsToDelete = filterSegmentsToDelete(tableName,
segmentIds);
+ if (segmentsToDelete == null) {
+ // ExternalView or IdealState was unavailable; skip the whole batch
+ return;
+ }
+
+ Set<String> deletedSegments = new HashSet<>();
+ if (!segmentsToDelete.isEmpty()) {
+ // Capture segment time ranges before ZK metadata is removed (for MV
dirty marking)
+ notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
+
+ // Notify all active listeners here
+ PinotSegmentLifecycleEventListenerManager.getInstance()
+ .notifyListeners(new SegmentDeletionEventDetails(tableName,
segmentsToDelete));
+
+ deletedSegments = deleteSegmentsFromPropertyStore(tableName,
segmentsToDelete);
+
+ // Best effort remove segments from deep store.
+ // If this fails (e.g. controller crashes, deep store unavailable),
future runs of RetentionManager
+ // will attempt to delete orphan deep store entries. Check
getSegmentsToDeleteFromDeepstore()
+ removeSegmentsFromStoreInBatch(tableName, deletedSegments,
deletedSegmentsRetentionMs);
+ }
+
+ Set<String> segmentsToRetryLater = new HashSet<>(segmentIds);
+ segmentsToRetryLater.removeAll(deletedSegments);
+
+ LOGGER.info("Deleted {} segments from table {}:{}",
deletedSegments.size(), tableName,
+ deletedSegments.size() <= 5 ? deletedSegments : "");
+
+ if (!segmentsToRetryLater.isEmpty()) {
+ rescheduleRetry(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, deletionDelay);
+ }
+ }
+
+ /// Check if segment got removed from ExternalView or IdealState
+ /// Returns `null` when the ExternalView or IdealState is unavailable
+ protected List<String> filterSegmentsToDelete(String tableName,
Collection<String> segmentIds) {
ExternalView externalView =
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
if (externalView == null || idealState == null) {
LOGGER.warn("Resource: {} is not set up in idealState or ExternalView,
won't do anything", tableName);
- return;
+ return null;
}
- List<String> segmentsToDelete = new ArrayList<>(segmentIds.size()); // Has
the segments that will be deleted
- Set<String> segmentsToRetryLater = new HashSet<>(segmentIds.size()); //
List of segments that we need to retry
-
+ List<String> segmentsToDelete = new ArrayList<>(segmentIds.size());
try {
for (String segmentId : segmentIds) {
Map<String, String> segmentToInstancesMapFromExternalView =
externalView.getStateMap(segmentId);
@@ -205,68 +240,60 @@ public class SegmentDeletionManager {
if ((segmentToInstancesMapFromExternalView == null ||
segmentToInstancesMapFromExternalView.isEmpty()) && (
segmentToInstancesMapFromIdealStates == null ||
segmentToInstancesMapFromIdealStates.isEmpty())) {
segmentsToDelete.add(segmentId);
- } else {
- segmentsToRetryLater.add(segmentId);
}
}
} catch (Exception e) {
LOGGER.warn("Caught exception while checking helix states for table:
{}", tableName, e);
segmentsToDelete.clear();
segmentsToDelete.addAll(segmentIds);
- segmentsToRetryLater.clear();
}
+ return segmentsToDelete;
+ }
- if (!segmentsToDelete.isEmpty()) {
- List<String> propStorePathList = new
ArrayList<>(segmentsToDelete.size());
- for (String segmentId : segmentsToDelete) {
- String segmentPropertyStorePath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
- propStorePathList.add(segmentPropertyStorePath);
- }
-
- // Capture segment time ranges before ZK metadata is removed (for MV
dirty marking)
- notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
-
- // Notify all active listeners here
- PinotSegmentLifecycleEventListenerManager.getInstance()
- .notifyListeners(new SegmentDeletionEventDetails(tableName,
segmentsToDelete));
+ /// Removes the property-store znodes for the given segments
+ /// Returns the set of segments that were successfully deleted.
+ protected Set<String> deleteSegmentsFromPropertyStore(String tableName,
List<String> segmentsToDelete) {
+ // Use a LinkedHashSet so deep-store deletion preserves the input order
(deterministic) rather
+ // than HashSet bucket order, while keeping the Set semantics callers rely
on.
+ Set<String> deletedSegments = new LinkedHashSet<>(segmentsToDelete.size());
+ List<String> propStorePathList = new ArrayList<>(segmentsToDelete.size());
+ for (String segmentId : segmentsToDelete) {
+ String segmentPropertyStorePath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
+ propStorePathList.add(segmentPropertyStorePath);
+ }
- boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList,
AccessOption.PERSISTENT);
- List<String> propStoreFailedSegs = new
ArrayList<>(segmentsToDelete.size());
- for (int i = 0; i < deleteSuccessful.length; i++) {
- final String segmentId = segmentsToDelete.get(i);
- if (!deleteSuccessful[i]) {
- // The batch remove API takes a non-recursive ZK path: it cannot
delete a znode that has
- // accumulated children. Fall back to the single-path remove API,
which falls back to a
- // recursive delete on the same NotEmpty failure. Skip when the
znode is already gone
- // (the batch call may have failed simply because the entry did not
exist).
- String segmentPath = propStorePathList.get(i);
- if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
- && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT))
{
- LOGGER.info("Could not delete {} from propertystore", segmentPath);
- segmentsToRetryLater.add(segmentId);
- propStoreFailedSegs.add(segmentId);
- }
+ boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList,
AccessOption.PERSISTENT);
+ for (int i = 0; i < deleteSuccessful.length; i++) {
+ final String segmentId = segmentsToDelete.get(i);
+ if (deleteSuccessful[i]) {
+ deletedSegments.add(segmentId);
+ } else {
+ // The batch remove API takes a non-recursive ZK path: it cannot
delete a znode that has
+ // accumulated children. Fall back to the single-path remove API,
which falls back to a
+ // recursive delete on the same NotEmpty failure. Skip when the znode
is already gone
+ // (the batch call may have failed simply because the entry did not
exist).
+ String segmentPath = propStorePathList.get(i);
+ if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
+ && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) {
+ LOGGER.info("Could not delete {} from propertystore", segmentPath);
+ } else {
+ deletedSegments.add(segmentId);
}
}
- segmentsToDelete.removeAll(propStoreFailedSegs);
-
- // TODO: If removing segments from deep store fails (e.g. controller
crashes, deep store unavailable), these
- // segments will become orphans and not easy to track because
their ZK metadata are already deleted.
- // Consider removing segments from deep store before cleaning up
the ZK metadata.
- removeSegmentsFromStoreInBatch(tableName, segmentsToDelete,
deletedSegmentsRetentionMs);
}
+ return deletedSegments;
+ }
- LOGGER.info("Deleted {} segments from table {}:{}",
segmentsToDelete.size(), tableName,
- segmentsToDelete.size() <= 5 ? segmentsToDelete : "");
-
- if (!segmentsToRetryLater.isEmpty()) {
- long effectiveDeletionDelay = Math.min(deletionDelay * 2,
MAX_DELETION_DELAY_SECONDS);
- LOGGER.info("Postponing deletion of {} segments from table {}",
segmentsToRetryLater.size(), tableName);
- deleteSegmentsWithDelay(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, effectiveDeletionDelay);
- }
+ /// Reschedules the segments that could not be deleted this pass, applying
the exponential back-off
+ /// (capped at [#MAX_DELETION_DELAY_SECONDS]). No-op when there is nothing
to retry.
+ protected void rescheduleRetry(String tableName, Collection<String>
segmentsToRetryLater,
+ Long deletedSegmentsRetentionMs, long deletionDelay) {
+ long effectiveDeletionDelay = Math.min(deletionDelay * 2,
MAX_DELETION_DELAY_SECONDS);
+ LOGGER.info("Postponing deletion of {} segments from table {}",
segmentsToRetryLater.size(), tableName);
+ deleteSegmentsWithDelay(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, effectiveDeletionDelay);
}
- public void removeSegmentsFromStoreInBatch(String tableNameWithType,
List<String> segments,
+ public void removeSegmentsFromStoreInBatch(String tableNameWithType,
Collection<String> segments,
@Nullable Long deletedSegmentsRetentionMs) {
if (_dataDir == null) {
LOGGER.info("dataDir is not configured, won't delete segment from disk
for table: {}", tableNameWithType);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 55de58eae34..f65166f5a31 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -312,6 +312,18 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
}
+ @VisibleForTesting
+ protected Set<String> getSegmentNames(String tableNameWithType,
+ List<SegmentZKMetadata> segmentZKMetadataList) {
+ return segmentZKMetadataList.stream()
+ .map(SegmentZKMetadata::getSegmentName)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ protected List<String> getSegmentNames(String tableNameWithType) {
+ return _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false);
+ }
+
private List<String> getSegmentsToDeleteFromDeepstore(String
tableNameWithType, RetentionStrategy retentionStrategy,
List<SegmentZKMetadata> segmentZKMetadataList, int
untrackedSegmentsDeletionBatchSize,
RetentionStrategy untrackedSegmentsRetentionStrategy) {
@@ -344,18 +356,11 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
return segmentsToDelete;
}
- Set<String> segmentsPresentInZK;
+ Set<String> segmentsPresentInZK = getSegmentNames(tableNameWithType,
segmentZKMetadataList);
if (isHybridTable) {
- segmentsPresentInZK = new HashSet<>();
// This must be the OFFLINE table
- segmentsPresentInZK.addAll(
-
segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toSet()));
// Add segments from the REALTIME table as well
- segmentsPresentInZK.addAll(
-
_pinotHelixResourceManager.getSegmentsFor(TableNameBuilder.REALTIME.tableNameWithType(rawTableName),
false));
- } else {
- segmentsPresentInZK =
-
segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toSet());
+
segmentsPresentInZK.addAll(getSegmentNames(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)));
}
try {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index c32459615db..9999612bd24 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -91,6 +91,13 @@ public class RetentionManagerTest {
private Path _tempDir;
private File _tableDir;
+ protected RetentionManager createRetentionManager(PinotHelixResourceManager
pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
+ BrokerServiceHelper brokerServiceHelper) {
+ return new RetentionManager(pinotHelixResourceManager,
leadControllerManager, config, controllerMetrics,
+ brokerServiceHelper);
+ }
+
@BeforeMethod
public void setUp() throws Exception {
// Setup for real file test
@@ -202,7 +209,7 @@ public class RetentionManagerTest {
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
- new RetentionManager(pinotHelixResourceManager, leadControllerManager,
conf, controllerMetrics,
+ createRetentionManager(pinotHelixResourceManager,
leadControllerManager, conf, controllerMetrics,
brokerServiceHelper);
retentionManager.start();
retentionManager.run();
@@ -403,7 +410,7 @@ public class RetentionManagerTest {
PinotHelixResourceManager mockResourceManager =
mock(PinotHelixResourceManager.class);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
- RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
+ RetentionManager retentionManager =
createRetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
controllerMetrics, brokerServiceHelper);
retentionManager.start();
retentionManager.run();
@@ -495,7 +502,7 @@ public class RetentionManagerTest {
// test
RetentionManager retentionManager =
- new RetentionManager(mockPinotHelixResourceManager, null,
controllerConf, mock(ControllerMetrics.class),
+ createRetentionManager(mockPinotHelixResourceManager, null,
controllerConf, mock(ControllerMetrics.class),
brokerServiceHelper);
retentionManager.manageRetentionForHybridTable(realtimeTableConfig,
offlineTableConfig);
@@ -613,7 +620,7 @@ public class RetentionManagerTest {
PinotHelixResourceManager mockResourceManager =
mock(PinotHelixResourceManager.class);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
- RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
+ RetentionManager retentionManager =
createRetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
controllerMetrics, brokerServiceHelper);
retentionManager.start();
retentionManager.run();
@@ -651,7 +658,7 @@ public class RetentionManagerTest {
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
- new RetentionManager(pinotHelixResourceManager, leadControllerManager,
conf, controllerMetrics,
+ createRetentionManager(pinotHelixResourceManager,
leadControllerManager, conf, controllerMetrics,
brokerServiceHelper);
retentionManager.start();
retentionManager.run();
@@ -698,7 +705,7 @@ public class RetentionManagerTest {
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
- new RetentionManager(pinotHelixResourceManager, leadControllerManager,
conf, controllerMetrics,
+ createRetentionManager(pinotHelixResourceManager,
leadControllerManager, conf, controllerMetrics,
brokerServiceHelper);
retentionManager.findUntrackedSegmentsToDeleteFromDeepstore("table1_REALTIME",
null, segmentsToExclude, null);
@@ -871,7 +878,7 @@ public class RetentionManagerTest {
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
- new RetentionManager(mockResourceManager,
mock(LeadControllerManager.class), conf, controllerMetrics,
+ createRetentionManager(mockResourceManager,
mock(LeadControllerManager.class), conf, controllerMetrics,
brokerServiceHelper);
// Default should be false
@@ -954,7 +961,7 @@ public class RetentionManagerTest {
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(mockResourceManager, conf, null, null);
RetentionManager retentionManager =
- new RetentionManager(pinotHelixResourceManager, leadControllerManager,
conf, controllerMetrics,
+ createRetentionManager(pinotHelixResourceManager,
leadControllerManager, conf, controllerMetrics,
brokerServiceHelper);
retentionManager.start();
retentionManager.run();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index c65162123cb..bf36f7a8660 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -82,6 +82,11 @@ public class SegmentDeletionManagerTest {
RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
}
+ protected SegmentDeletionManager createDeletionManager(String dataDir,
HelixAdmin helixAdmin, String clusterName,
+ ZkHelixPropertyStore<ZNRecord> propertyStore, int
deletedSegmentsRetentionInDays) {
+ return new SegmentDeletionManager(dataDir, helixAdmin, clusterName,
propertyStore, deletedSegmentsRetentionInDays);
+ }
+
HelixAdmin makeHelixAdmin() {
HelixAdmin admin = mock(HelixAdmin.class);
ExternalView ev = mock(ExternalView.class);
@@ -387,7 +392,7 @@ public class SegmentDeletionManagerTest {
ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
File tempDir = Files.createTempDirectory("pinot-test-").toFile();
tempDir.deleteOnExit();
- SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+ SegmentDeletionManager deletionManager = createDeletionManager(
tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
// create table segment files.
@@ -443,7 +448,7 @@ public class SegmentDeletionManagerTest {
ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
File tempDir = Files.createTempDirectory("pinot-test-").toFile();
tempDir.deleteOnExit();
- SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+ SegmentDeletionManager deletionManager = createDeletionManager(
tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
// create table segment files.
@@ -512,7 +517,7 @@ public class SegmentDeletionManagerTest {
ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
File tempDir = Files.createTempDirectory("pinot-test-").toFile();
tempDir.deleteOnExit();
- SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+ SegmentDeletionManager deletionManager = createDeletionManager(
tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
// create table segment files.
@@ -602,7 +607,7 @@ public class SegmentDeletionManagerTest {
}
@Override
- public void removeSegmentsFromStoreInBatch(String tableNameWithType,
List<String> segments,
+ public void removeSegmentsFromStoreInBatch(String tableNameWithType,
Collection<String> segments,
@Nullable Long deletedSegmentsRetentionMs) {
_segmentsRemovedFromStore.addAll(segments);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]