This is an automated email from the ASF dual-hosted git repository.
noob-se7en pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e76da0e6870 Refactor BaseTableDataManager (#18381)
e76da0e6870 is described below
commit e76da0e6870f0f0486f5e5c37ad6494f076995e0
Author: Krishan Goyal <[email protected]>
AuthorDate: Tue May 19 17:36:58 2026 +0530
Refactor BaseTableDataManager (#18381)
* Refactor BaseTableDataManager so multi-segment managers can compose its
building blocks
Three small refactors that keep single-segment behavior identical and let a
future multi-segment SegmentDataManager (e.g. a wrapper around N constituent
segments) reuse the same load and reload primitives without forking them:
1. Extract `protected ImmutableSegment loadSegment(zkMetadata, ilc)` from
`downloadAndLoadSegment`. The new helper performs only the download +
`ImmutableSegmentLoader.load`, returning the segment without registering
it
in `_segmentDataManagerMap` or invoking upsert hooks. Single-segment
callers
continue to use `downloadAndLoadSegment`, which now composes the helper +
`addSegment(...)`. This lets a multi-segment manager load all of its
members
first and register a single wrapper entry under one name.
2. Push `_segmentReloadSemaphore` acquire/release down into
`reloadSegment(SegmentDataManager, IndexLoadingConfig, boolean)`. The
public
`reloadSegment(String)` and the private parallel
`reloadSegments(List<SDM>)`
both used to wrap the inner call with the semaphore; that acquire is now
inside the per-physical-segment body and the outer wrappers are removed
(which would otherwise double-acquire on a non-reentrant semaphore). For
non-group tables this is structurally identical (one segment -> one
acquire
-> one release; same concurrency bound). For multi-segment managers that
fan out N reloads, each member contends for a slot independently.
3. Drop `@VisibleForTesting` on `isSegmentStale(IndexLoadingConfig,
SegmentDataManager)` and widen to plain `protected` so subclasses can
call
it from group-aware overrides of `getStaleSegments` /
`needReloadSegments`.
The semaphore stays at the orchestration boundary in `doReplaceSegment` (not
relocated into `replaceSegmentIfCrcMismatch`), because subclasses commonly
override `replaceSegmentIfCrcMismatch` and a relocation there would leak the
acquire across paths that bypass the override; subclasses needing per-member
acquire on a multi-segment replace can wrap the call themselves.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Cleanup comment
* Refactor BaseTDM and its tests to enable cleaner extensions, abstractions
and ownership for future work
* Cleanup deleteSegment
* Move offloadSegment to deleteSegment
* Reload should take lock first and then acquire semaphore
* Cleanup BaseTDM
* Acquire then take segment lock in reload
* Acquire reload semaphore for realtime force commit too
* Make some accessors public in TDM
* Make some accessors public in TDM
* Avoid sharing _segmentDataManagerMap publicly
* Reload Test case fixes
* Add createHelixManagerMock
* Rename to tryLoadExistingSegmentWithoutRegistering
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../core/data/manager/BaseTableDataManager.java | 151 +++++++++++++++++----
.../BaseTableDataManagerAcquireSegmentTest.java | 98 ++++++++-----
.../BaseTableDataManagerNeedRefreshTest.java | 103 +++++++-------
.../data/manager/BaseTableDataManagerTest.java | 151 ++++++++++++++-------
.../local/data/manager/SegmentDataManager.java | 2 -
.../local/data/manager/TableDataManager.java | 7 +
.../starter/helix/HelixInstanceDataManager.java | 43 ++----
7 files changed, 369 insertions(+), 186 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 2bd134a9e52..e0b2e76980b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -606,6 +607,56 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
}
+ @Override
+ public void deleteSegment(String segmentName)
+ throws Exception {
+ _logger.info("Deleting segment: {}", segmentName);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ if (hasSegment(segmentName)) {
+ _logger.warn("Segment: {} is still loaded, offloading it before
delete", segmentName);
+ offloadSegment(segmentName);
+ }
+ doDeleteSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception
while deleting segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ protected void doDeleteSegment(String segmentName)
+ throws Exception {
+ deleteSegmentFilesFromDisk(_tableDataDir, segmentName,
_instanceDataManagerConfig);
+ _logger.info("Deleted segment: {}", segmentName);
+ }
+
+ /**
+ * Removes the segment directory locally and does tier-aware cleanup too
+ */
+ public static void deleteSegmentFilesFromDisk(String tableDataDir, String
segmentName,
+ InstanceDataManagerConfig instanceConfig)
+ throws Exception {
+ File segmentDir = new File(tableDataDir, segmentName);
+ if (segmentDir.exists()) {
+ FileUtils.deleteQuietly(segmentDir);
+ LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
+ }
+ SegmentDirectoryLoader segmentLoader =
+
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());
+ if (segmentLoader != null) {
+ LOGGER.info("Deleting segment: {} further with segment loader: {}",
segmentName,
+ instanceConfig.getSegmentDirectoryLoader());
+ SegmentDirectoryLoaderContext ctx = new
SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
+ .setTableDataDir(tableDataDir)
+ .build();
+ segmentLoader.delete(ctx);
+ }
+ }
+
@Override
public void reloadSegment(String segmentName, boolean forceDownload, String
reloadJobId)
throws Exception {
@@ -616,12 +667,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
- _segmentReloadSemaphore.acquire(segmentName, _logger);
- try {
- reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
- } finally {
- _segmentReloadSemaphore.release();
- }
+ reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} else {
_logger.warn("Failed to find segment: {}, skipping reloading it",
segmentName);
}
@@ -771,6 +817,36 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return _segmentPreloadExecutor;
}
+ public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
+ return _propertyStore;
+ }
+
+ @Nullable
+ public SegmentDataManager getSegmentDataManager(String segmentName) {
+ return _segmentDataManagerMap.get(segmentName);
+ }
+
+ public Collection<SegmentDataManager> getAllSegmentDataManagers() {
+ return Collections.unmodifiableCollection(_segmentDataManagerMap.values());
+ }
+
+ public Logger getLogger() {
+ return _logger;
+ }
+
+ public SegmentReloadSemaphore getSegmentReloadSemaphore() {
+ return _segmentReloadSemaphore;
+ }
+
+ @Nullable
+ public SegmentOperationsThrottlerSet getSegmentOperationsThrottlerSet() {
+ return _segmentOperationsThrottlerSet;
+ }
+
+ public ServerMetrics getServerMetrics() {
+ return _serverMetrics;
+ }
+
@Override
public boolean isDeleted() {
return _isDeleted;
@@ -947,12 +1023,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager ->
CompletableFuture.runAsync(() -> {
String segmentName = segmentDataManager.getSegmentName();
try {
- _segmentReloadSemaphore.acquire(segmentName, _logger);
- try {
- reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
- } finally {
- _segmentReloadSemaphore.release();
- }
+ reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
} catch (Throwable t) {
_logger.error("Caught exception while reloading segment: {}",
segmentName, t);
failedSegments.add(segmentName);
@@ -994,7 +1065,12 @@ public abstract class BaseTableDataManager implements
TableDataManager {
+ "Change the cluster config: {} to `PROTECTED` for safer
commit", segmentName, config.getConfigKey());
} else {
_logger.info("Reloading (force committing) consuming segment: {}",
segmentName);
- ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
+ _segmentReloadSemaphore.acquire(segmentName, _logger);
+ try {
+ ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
+ } finally {
+ _segmentReloadSemaphore.release();
+ }
}
} else {
_logger.warn("Skip reloading consuming segment: {} as configured",
segmentName);
@@ -1014,9 +1090,17 @@ public abstract class BaseTableDataManager implements
TableDataManager {
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
File indexDir = getSegmentDataDir(segmentName, segmentTier,
indexLoadingConfig.getTableConfig());
+ _segmentReloadSemaphore.acquire(segmentName, _logger);
Lock segmentLock = getSegmentLock(segmentName);
segmentLock.lock();
try {
+ // Re-validate under the segment lock: the caller's map read in
reloadSegment(String, boolean, String) is
+ // unlocked, so a concurrent offloadSegment can remove the entry between
that read and this lock acquisition.
+ // Without this guard, reload would resurrect a segment the cluster has
already dropped.
+ if (_segmentDataManagerMap.get(segmentName) == null) {
+ _logger.warn("Skipping reload for segment: {} — concurrently offloaded
after dispatch", segmentName);
+ return;
+ }
/*
Determines if a segment should be downloaded from deep storage based on:
1. A forced download flag.
@@ -1092,6 +1176,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
throw reloadFailureException;
} finally {
segmentLock.unlock();
+ _segmentReloadSemaphore.release();
}
_logger.info("Reloaded segment: {}", segmentName);
}
@@ -1116,7 +1201,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
* existing segment with the same name.
*/
@Nullable
- protected SegmentDataManager registerSegment(String segmentName,
SegmentDataManager segmentDataManager) {
+ public SegmentDataManager registerSegment(String segmentName,
SegmentDataManager segmentDataManager) {
SegmentDataManager oldSegmentDataManager;
synchronized (_segmentDataManagerMap) {
oldSegmentDataManager = _segmentDataManagerMap.put(segmentName,
segmentDataManager);
@@ -1133,7 +1218,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
* identify this scenario.
*/
@Nullable
- protected SegmentDataManager unregisterSegment(String segmentName) {
+ public SegmentDataManager unregisterSegment(String segmentName) {
_recentlyDeletedSegments.put(segmentName, segmentName);
synchronized (_segmentDataManagerMap) {
return _segmentDataManagerMap.remove(segmentName);
@@ -1145,7 +1230,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
* Segment can be downloaded from deep store or from peer servers.
Downloaded segment might be compressed or
* encrypted, and this method takes care of decompressing and decrypting the
segment.
*/
- protected File downloadSegment(SegmentZKMetadata zkMetadata)
+ public File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
String segmentName = zkMetadata.getSegmentName();
String downloadUrl = zkMetadata.getDownloadUrl();
@@ -1376,7 +1461,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
* The original index directory is restored lazily, as depending on the
conditions,
* it may be restored from the backup directory or segment downloaded from
deep store.
*/
- protected void createBackup(File indexDir) {
+ public void createBackup(File indexDir) {
if (!indexDir.exists()) {
return;
}
@@ -1393,7 +1478,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
* When we rename the segment backup directory to segment temporary
directory, we know the reload
* already succeeded, so that we can safely delete the segment temporary
directory.
*/
- protected void removeBackup(File indexDir)
+ public void removeBackup(File indexDir)
throws IOException {
File parentDir = indexDir.getParentFile();
File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
@@ -1410,6 +1495,22 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
+ ImmutableSegment segment =
tryLoadExistingSegmentWithoutRegistering(zkMetadata, indexLoadingConfig);
+ if (segment == null) {
+ return false;
+ }
+ addSegment(segment, zkMetadata);
+ return true;
+ }
+
+ /**
+ * Just Loads a segment from the existing on-disk copy without registering
it in {@code _segmentDataManagerMap} or
+ * invoking other hooks.
+ * Returns {@code null} when the on-disk copy is absent, has a stale CRC
under or fails to load
+ */
+ @Nullable
+ public ImmutableSegment
tryLoadExistingSegmentWithoutRegistering(SegmentZKMetadata zkMetadata,
+ IndexLoadingConfig indexLoadingConfig) {
String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot load existing
segment: %s of table: %s", segmentName,
@@ -1442,14 +1543,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
if (segmentMetadata == null) {
_logger.info("Segment: {} does not exist", segmentName);
closeSegmentDirectoryQuietly(segmentDirectory);
- return false;
+ return null;
}
if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata,
segmentMetadata)) {
_logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName,
segmentMetadata.getCrc(),
zkMetadata.getCrc());
if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) {
closeSegmentDirectoryQuietly(segmentDirectory);
- return false;
+ return null;
}
_logger.info("Skipping CRC check for segment: {} as configured. Proceed
to load segment.", segmentName);
}
@@ -1470,15 +1571,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
indexLoadingConfig, zkMetadata);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig);
- addSegment(segment, zkMetadata);
_logger.info("Loaded existing segment: {} with CRC: {} on tier: {}",
segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier));
- return true;
+ return segment;
} catch (Exception e) {
_logger.error("Failed to load existing segment: {} with CRC: {} on tier:
{}", segmentName, zkMetadata.getCrc(),
TierConfigUtils.normalizeTierName(segmentTier), e);
closeSegmentDirectoryQuietly(segmentDirectory);
- return false;
+ return null;
}
}
@@ -1542,8 +1642,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return staleSegments;
}
- @VisibleForTesting
- StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig,
SegmentDataManager segmentDataManager) {
+ public StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig,
SegmentDataManager segmentDataManager) {
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
Schema schema = indexLoadingConfig.getSchema();
assert tableConfig != null && schema != null;
@@ -1847,7 +1946,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// CRC check can be performed on both segment CRC and data CRC (if
available) based on the ZK property value of
// useDataCRC.
- protected static boolean hasSameCRC(SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata) {
+ public static boolean hasSameCRC(SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata) {
if (zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc())) {
return true;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 519a3e8ff9f..a6d48b56e06 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -31,7 +31,6 @@ import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -73,7 +72,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
private final Set<ImmutableSegment> _allSegments = new HashSet<>();
private final Set<SegmentDataManager> _accessedSegManagers =
ConcurrentHashMap.newKeySet();
private final Set<SegmentDataManager> _allSegManagers =
ConcurrentHashMap.newKeySet();
- private Map<String, ImmutableSegmentDataManager> _internalSegMap;
+ protected Map<String, SegmentDataManager> _internalSegMap;
private Throwable _exception;
private Thread _masterThread;
// Segment numbers in place.
@@ -115,7 +114,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
_masterThread = null;
}
- private TableDataManager makeTestableManager()
+ protected TableDataManager makeTestableManager()
throws Exception {
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(_tmpDir.getAbsolutePath());
@@ -129,18 +128,51 @@ public class BaseTableDataManagerAcquireSegmentTest {
new SegmentOperationsThrottler(4, 8, true),
new SegmentOperationsThrottler(10, 20, true),
new SegmentOperationsThrottler(4, 8, true));
- TableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), tableConfig, schema,
+ TableDataManager tableDataManager = newTableDataManager();
+ tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(),
new SegmentLocks(), tableConfig, schema,
new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, segmentOperationsThrottlerSet,
false, mock(ServerReloadJobStatusCache.class));
tableDataManager.start();
Field segsMapField =
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
- _internalSegMap = (Map<String, ImmutableSegmentDataManager>)
segsMapField.get(tableDataManager);
+ _internalSegMap = (Map<String, SegmentDataManager>)
segsMapField.get(tableDataManager);
return tableDataManager;
}
- private ImmutableSegment makeImmutableSegment(String segmentName, int
totalDocs) {
+ /**
+ * Returns the concrete {@link TableDataManager} instance under test.
Default returns a stock
+ * {@link OfflineTableDataManager}; subclasses override to test a different
implementation while inheriting all
+ * test bodies.
+ */
+ protected TableDataManager newTableDataManager() {
+ return new OfflineTableDataManager();
+ }
+
+ /**
+ * Returns the {@link HelixManager} mock wired into the TDM under test.
Default returns a bare Mockito mock
+ * — fine for the inherited test bodies which do not read cluster config or
property store directly.
+ * Subclasses that exercise paths reading those override to stub them.
+ */
+ protected HelixManager createHelixManagerMock() {
+ return mock(HelixManager.class);
+ }
+
+ /**
+ * Returns the TDM-map key under which a segment with the given name is
registered.
+ */
+ protected String tdmKey(String segmentName) {
+ return segmentName;
+ }
+
+ protected void addSegment(TableDataManager tdm, ImmutableSegment seg) {
+ tdm.addSegment(seg);
+ }
+
+ protected ImmutableSegment getInnerSegment(SegmentDataManager sdm) {
+ return (ImmutableSegment) sdm.getSegment();
+ }
+
+ protected ImmutableSegment makeImmutableSegment(String segmentName, int
totalDocs) {
ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
@@ -164,11 +196,11 @@ public class BaseTableDataManagerAcquireSegmentTest {
// Add the segment, get it for use, remove the segment, and then return it.
// Make sure that the segment is not destroyed before return.
ImmutableSegment immutableSegment = makeImmutableSegment(segmentName,
totalDocs);
- tableDataManager.addSegment(immutableSegment);
+ addSegment(tableDataManager, immutableSegment);
Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(tdmKey(segmentName));
Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
- tableDataManager.offloadSegment(segmentName);
+ tableDataManager.offloadSegment(tdmKey(segmentName));
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
Assert.assertEquals(_nDestroys, 0);
@@ -177,7 +209,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
Assert.assertEquals(_nDestroys, 1);
// Now the segment should not be available for use.Also, returning a null
reader is fine
- segmentDataManager = tableDataManager.acquireSegment(segmentName);
+ segmentDataManager = tableDataManager.acquireSegment(tdmKey(segmentName));
Assert.assertNull(segmentDataManager);
List<SegmentDataManager> segmentDataManagers =
tableDataManager.acquireAllSegments();
Assert.assertEquals(segmentDataManagers.size(), 0);
@@ -185,26 +217,26 @@ public class BaseTableDataManagerAcquireSegmentTest {
// If a caller tries to acquire the deleted segment using acquireSegments,
it will be returned in
// notAcquiredSegments. The isSegmentDeletedRecently method should return
true.
List<String> notAcquiredSegments = new ArrayList<>();
- tableDataManager.acquireSegments(List.of(segmentName),
notAcquiredSegments);
+ tableDataManager.acquireSegments(List.of(tdmKey(segmentName)),
notAcquiredSegments);
Assert.assertEquals(notAcquiredSegments.size(), 1);
- Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(segmentName));
+
Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName)));
// Adding and removing the segment again is fine. After adding the segment
back, isSegmentDeletedRecently should
// return false.
- tableDataManager.addSegment(immutableSegment);
- Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(segmentName));
- tableDataManager.offloadSegment(segmentName);
+ addSegment(tableDataManager, immutableSegment);
+
Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName)));
+ tableDataManager.offloadSegment(tdmKey(segmentName));
// Removing the segment again is fine.
- tableDataManager.offloadSegment(segmentName);
+ tableDataManager.offloadSegment(tdmKey(segmentName));
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
// Add a new segment and remove it in order this time.
final String anotherSeg = "AnotherSegment";
ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
- tableDataManager.addSegment(ix1);
+ addSegment(tableDataManager, ix1);
Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
+ SegmentDataManager sdm1 =
tableDataManager.acquireSegment(tdmKey(anotherSeg));
Assert.assertNotNull(sdm1);
Assert.assertEquals(sdm1.getReferenceCount(), 2);
// acquire all segments
@@ -220,15 +252,15 @@ public class BaseTableDataManagerAcquireSegmentTest {
Assert.assertEquals(sdm1.getReferenceCount(), 1);
// Now replace the segment with another one.
ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
- tableDataManager.addSegment(ix2);
+ addSegment(tableDataManager, ix2);
Assert.assertEquals(tableDataManager.getNumSegments(), 1);
// Now the previous one should have been destroyed, and
Assert.assertEquals(sdm1.getReferenceCount(), 0);
verify(ix1, times(1)).destroy();
// Delete ix2 without accessing it.
- SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg);
+ SegmentDataManager sdm2 = _internalSegMap.get(tdmKey(anotherSeg));
Assert.assertEquals(sdm2.getReferenceCount(), 1);
- tableDataManager.offloadSegment(anotherSeg);
+ tableDataManager.offloadSegment(tdmKey(anotherSeg));
Assert.assertEquals(tableDataManager.getNumSegments(), 0);
Assert.assertEquals(sdm2.getReferenceCount(), 0);
verify(ix2, times(1)).destroy();
@@ -265,8 +297,8 @@ public class BaseTableDataManagerAcquireSegmentTest {
for (int i = _lo; i <= _hi; i++) {
final String segName = SEGMENT_PREFIX + i;
- tableDataManager.addSegment(makeImmutableSegment(segName,
_random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ addSegment(tableDataManager, makeImmutableSegment(segName,
_random.nextInt()));
+ _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
}
runStorageServer(numQueryThreads, runTimeSec, tableDataManager); //
replaces segments while online
@@ -315,14 +347,14 @@ public class BaseTableDataManagerAcquireSegmentTest {
for (SegmentDataManager segmentDataManager : _internalSegMap.values()) {
Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
// We should never have called destroy on these segments. Remove it from
the list of accessed segments.
- verify(segmentDataManager.getSegment(), never()).destroy();
+ verify(getInnerSegment(segmentDataManager), never()).destroy();
_allSegManagers.remove(segmentDataManager);
_accessedSegManagers.remove(segmentDataManager);
}
// For the remaining segments in accessed list, destroy must have been
called exactly once.
for (SegmentDataManager segmentDataManager : _allSegManagers) {
- verify(segmentDataManager.getSegment(), times(1)).destroy();
+ verify(getInnerSegment(segmentDataManager), times(1)).destroy();
// Also their count should be 0
Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
}
@@ -361,7 +393,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
Set<Integer> segmentIds = pickSegments();
List<String> segmentList = new ArrayList<>(segmentIds.size());
for (Integer segmentId : segmentIds) {
- segmentList.add(SEGMENT_PREFIX + segmentId);
+ segmentList.add(tdmKey(SEGMENT_PREFIX + segmentId));
}
segmentDataManagers =
_tableDataManager.acquireSegments(segmentList, new ArrayList<>());
}
@@ -448,8 +480,9 @@ public class BaseTableDataManagerAcquireSegmentTest {
private void addSegment() {
final int segmentToAdd = _hi + 1;
final String segName = SEGMENT_PREFIX + segmentToAdd;
- _tableDataManager.addSegment(makeImmutableSegment(segName,
_random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager,
+ makeImmutableSegment(segName, _random.nextInt()));
+ _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
_hi = segmentToAdd;
}
@@ -457,8 +490,9 @@ public class BaseTableDataManagerAcquireSegmentTest {
private void replaceSegment() {
int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
final String segName = SEGMENT_PREFIX + segToReplace;
- _tableDataManager.addSegment(makeImmutableSegment(segName,
_random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager,
+ makeImmutableSegment(segName, _random.nextInt()));
+ _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
}
// Remove the segment _lo and then bump _lo
@@ -466,7 +500,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
throws Exception {
// Keep at least one segment in place.
if (_hi > _lo) {
- _tableDataManager.offloadSegment(SEGMENT_PREFIX + _lo);
+ _tableDataManager.offloadSegment(tdmKey(SEGMENT_PREFIX + _lo));
_lo++;
} else {
addSegment();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
index 61657b8e5fd..c1ef555a3af 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -52,6 +52,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -64,7 +65,7 @@ import static org.testng.Assert.assertTrue;
@Test
-public class BaseTableDataManagerNeedRefreshTest {
+public class BaseTableDataManagerNeedRefreshTest extends
BaseTableDataManagerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"BaseTableDataManagerNeedRefreshTest");
private static final String DEFAULT_TABLE_NAME = "mytable";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
@@ -91,7 +92,7 @@ public class BaseTableDataManagerNeedRefreshTest {
private static final Schema SCHEMA;
private static final IndexLoadingConfig INDEX_LOADING_CONFIG;
private static final ImmutableSegmentDataManager
IMMUTABLE_SEGMENT_DATA_MANAGER;
- private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
+ private BaseTableDataManager _baseTableDataManager;
private String _testName = "defaultTestName";
@@ -102,12 +103,16 @@ public class BaseTableDataManagerNeedRefreshTest {
INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
IMMUTABLE_SEGMENT_DATA_MANAGER =
createImmutableSegmentDataManager(INDEX_LOADING_CONFIG,
"basicSegment", generateRows());
- BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ @BeforeClass
+ public void initBaseTableDataManager() {
+ _baseTableDataManager = createTableManager();
+ }
+
protected static TableConfigBuilder getTableConfigBuilder() {
return new
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
.setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
@@ -198,7 +203,7 @@ public class BaseTableDataManagerNeedRefreshTest {
when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
File indexDir = createSegment(indexLoadingConfig, segmentName, rows);
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
indexLoadingConfig,
- BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER);
+ SEGMENT_OPERATIONS_THROTTLER);
when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
return segmentDataManager;
}
@@ -233,7 +238,7 @@ public class BaseTableDataManagerNeedRefreshTest {
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(indexLoadingConfig, "noChanges",
List.of(row));
- BaseTableDataManager tableDataManager =
BaseTableDataManagerTest.createTableManager();
+ BaseTableDataManager tableDataManager = createTableManager();
StaleSegment response =
tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager);
assertFalse(response.isStale());
@@ -247,7 +252,7 @@ public class BaseTableDataManagerNeedRefreshTest {
@Test
void testChangeTimeColumn() {
TableConfig tableConfig =
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build();
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "time column");
@@ -258,7 +263,7 @@ public class BaseTableDataManagerNeedRefreshTest {
throws Exception {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "column deleted: textColumn");
@@ -270,7 +275,7 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, true));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "field type changed: textColumn");
@@ -282,7 +287,7 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.INT, true));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "data type changed: textColumn");
@@ -294,7 +299,7 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN,
FieldSpec.DataType.STRING, false));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "single / multi value changed:
textColumn");
@@ -306,7 +311,7 @@ public class BaseTableDataManagerNeedRefreshTest {
Schema schema = getSchema();
schema.removeField(TEXT_INDEX_COLUMN_MV);
schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV,
FieldSpec.DataType.STRING, true));
- StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
+ StaleSegment response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(TABLE_CONFIG, schema),
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "single / multi value changed:
textColumnMV");
@@ -317,12 +322,12 @@ public class BaseTableDataManagerNeedRefreshTest {
// Check with a column that is not sorted
TableConfig tableConfig =
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build();
IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, SCHEMA);
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "sort column changed:
MilliSecondsSinceEpoch");
// Check with a column that is sorted
tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN));
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@DataProvider(name = "testFilterArgs")
@@ -357,17 +362,17 @@ public class BaseTableDataManagerNeedRefreshTest {
createImmutableSegmentDataManager(indexLoadingConfig, segmentName,
generateRows());
// When TableConfig has a filter but segment does not have, needRefresh is
true.
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), expectedReason);
// When TableConfig does not have a filter but segment has, needRefresh is
true
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithFilter);
+ response = _baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithFilter);
assertTrue(response.isStale());
assertEquals(response.getReason(), expectedReason);
// When TableConfig has a filter AND segment also has a filter,
needRefresh is false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
segmentWithFilter).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig,
segmentWithFilter).isStale());
}
@Test
@@ -380,17 +385,17 @@ public class BaseTableDataManagerNeedRefreshTest {
createImmutableSegmentDataManager(indexLoadingConfig,
"partitionWithModulo", generateRows());
// when segment has no partition AND tableConfig has partitions then
needRefresh = true
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "partition function added:
partitionedColumn");
// when segment has partitions AND tableConfig has no partitions, then
needRefresh = false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithPartition).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithPartition).isStale());
// when # of partitions is different, then needRefresh = true
TableConfig partitionedTableConfig40 =
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
+ response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
segmentWithPartition);
assertTrue(response.isStale());
assertEquals(response.getReason(), "num partitions changed:
partitionedColumn");
@@ -399,7 +404,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig partitionedTableConfigMurmur =
getTableConfigBuilder().setSegmentPartitionConfig(
new SegmentPartitionConfig(
Map.of(PARTITIONED_COLUMN_NAME, new
ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
- response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
+ response = _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
segmentWithPartition);
assertTrue(response.isStale());
assertEquals(response.getReason(), "partition function name changed:
partitionedColumn");
@@ -414,12 +419,12 @@ public class BaseTableDataManagerNeedRefreshTest {
createImmutableSegmentDataManager(indexLoadingConfig,
"withoutNullHandling", generateRows());
// If null handling is removed from table config AND segment has NVV, then
NVV can be removed. needRefresh = true
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfig,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "null value vector index removed from
column: NullValueColumn");
// if NVV is added to table config AND segment does not have NVV, then it
cannot be added. needRefresh = false
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithoutNullHandling).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG,
segmentWithoutNullHandling).isStale());
}
@Test
@@ -428,7 +433,7 @@ public class BaseTableDataManagerNeedRefreshTest {
new MultiColumnTextIndexConfig(List.of(MC_TEXT_INDEX_COLUMN_1,
MC_TEXT_INDEX_COLUMN_2));
TableConfig tableConfig =
getTableConfigBuilder().setMultiColumnTextIndexConfig(newIndex).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
+ assertTrue(_baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@@ -448,7 +453,7 @@ public class BaseTableDataManagerNeedRefreshTest {
.build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -470,7 +475,7 @@ public class BaseTableDataManagerNeedRefreshTest {
.build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -492,7 +497,7 @@ public class BaseTableDataManagerNeedRefreshTest {
.build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -504,7 +509,7 @@ public class BaseTableDataManagerNeedRefreshTest {
new StarTreeIndexConfig(List.of("Carrier"), null,
List.of(AggregationFunctionColumnPair.COUNT_STAR_NAME), null,
100);
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
+ assertTrue(_baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(tableConfig, SCHEMA),
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
}
@@ -529,7 +534,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -552,7 +557,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -575,7 +580,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -598,7 +603,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -621,7 +626,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertFalse(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -642,7 +647,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -662,7 +667,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -683,7 +688,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertFalse(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -705,7 +710,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -727,7 +732,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -751,7 +756,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertFalse(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -771,7 +776,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -784,7 +789,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig tableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig,
SCHEMA), _testName, generateRows());
- assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG,
segmentDataManager).isStale());
+ assertTrue(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG,
segmentDataManager).isStale());
}
@Test
@@ -805,7 +810,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig,
newStarTreeIndexConfig)).build();
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -824,7 +829,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -840,7 +845,7 @@ public class BaseTableDataManagerNeedRefreshTest {
IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, SCHEMA);
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(indexLoadingConfig, _testName,
generateRows());
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
segmentDataManager).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig,
segmentDataManager).isStale());
}
@Test
@@ -859,7 +864,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig newTableConfig =
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
assertTrue(
- BASE_TABLE_DATA_MANAGER.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+ _baseTableDataManager.isSegmentStale(new
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
.isStale());
}
@@ -877,7 +882,7 @@ public class BaseTableDataManagerNeedRefreshTest {
ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(indexLoadingConfig, _testName,
generateRows());
- assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig,
segmentDataManager).isStale());
+ assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig,
segmentDataManager).isStale());
}
@Test
@@ -894,7 +899,7 @@ public class BaseTableDataManagerNeedRefreshTest {
// Segment was created without the timestamp index.
StaleSegment response =
-
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigWithTimestampIndex,
IMMUTABLE_SEGMENT_DATA_MANAGER);
+
_baseTableDataManager.isSegmentStale(indexLoadingConfigWithTimestampIndex,
IMMUTABLE_SEGMENT_DATA_MANAGER);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
@@ -917,7 +922,7 @@ public class BaseTableDataManagerNeedRefreshTest {
// Evaluate staleness against a fresh config with no timestamp index.
IndexLoadingConfig indexLoadingConfigNoTimestamp = new
IndexLoadingConfig(getTableConfigBuilder().build(),
getSchema());
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigNoTimestamp,
segmentDataManager);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfigNoTimestamp,
segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
@@ -942,7 +947,7 @@ public class BaseTableDataManagerNeedRefreshTest {
getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDayAndWeek)).build();
IndexLoadingConfig indexLoadingConfigDayAndWeek = new
IndexLoadingConfig(tableConfigDayAndWeek, getSchema());
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDayAndWeek,
segmentDataManager);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfigDayAndWeek,
segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
@@ -968,7 +973,7 @@ public class BaseTableDataManagerNeedRefreshTest {
TableConfig tableConfigDay =
getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDay)).build();
IndexLoadingConfig indexLoadingConfigDay = new
IndexLoadingConfig(tableConfigDay, getSchema());
- StaleSegment response =
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDay,
segmentDataManager);
+ StaleSegment response =
_baseTableDataManager.isSegmentStale(indexLoadingConfigDay, segmentDataManager);
assertTrue(response.isStale());
assertEquals(response.getReason(), "timestamp index changed");
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index a21c210616c..88d7565c306 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -155,10 +155,14 @@ public class BaseTableDataManagerTest {
SegmentMetadata localMetadata = mock(SegmentMetadata.class);
when(localMetadata.getCrc()).thenReturn("0");
- BaseTableDataManager tableDataManager = createTableManager();
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
assertFalse(dataDir.exists());
- tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(),
zkMetadata, localMetadata, false);
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
assertTrue(dataDir.exists());
assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
}
@@ -175,18 +179,24 @@ public class BaseTableDataManagerTest {
when(localMetadata.getCrc()).thenReturn("0");
// No dataDir for coolTier, thus stay on default tier.
- BaseTableDataManager tableDataManager = createTableManager();
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
+ .when(tableDataManager).fetchIndexLoadingConfig();
File defaultDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
assertFalse(defaultDataDir.exists());
- tableDataManager.reloadSegment(SEGMENT_NAME,
createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG), zkMetadata,
- localMetadata, false);
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
assertTrue(defaultDataDir.exists());
assertEquals(new SegmentMetadataImpl(defaultDataDir).getTotalDocs(), 5);
// Configured dataDir for coolTier, thus move to new dir.
- tableDataManager = createTableManager();
- tableDataManager.reloadSegment(SEGMENT_NAME,
createTierIndexLoadingConfig(TIER_TABLE_CONFIG), zkMetadata,
- localMetadata, false);
+ tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
+ .when(tableDataManager).fetchIndexLoadingConfig();
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
File tierDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME,
TIER_NAME, TIER_TABLE_CONFIG);
assertTrue(tierDataDir.exists());
assertFalse(defaultDataDir.exists());
@@ -205,15 +215,18 @@ public class BaseTableDataManagerTest {
SegmentMetadata localMetadata = mock(SegmentMetadata.class);
when(localMetadata.getCrc()).thenReturn(Long.toString(crc));
- BaseTableDataManager tableDataManager = createTableManager();
- tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(),
zkMetadata, localMetadata, false);
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
assertTrue(indexDir.exists());
assertEquals(new SegmentMetadataImpl(indexDir).getTotalDocs(), 5);
FileUtils.deleteQuietly(indexDir);
try {
- tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(),
zkMetadata, localMetadata, false);
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
fail();
} catch (Exception e) {
// As expected, segment reloading fails due to missing the local segment
dir.
@@ -234,17 +247,23 @@ public class BaseTableDataManagerTest {
when(localMetadata.getCrc()).thenReturn(Long.toString(crc));
// No dataDir for coolTier, thus stay on default tier.
- BaseTableDataManager tableDataManager = createTableManager();
- tableDataManager.reloadSegment(SEGMENT_NAME,
createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG), zkMetadata,
- localMetadata, false);
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
+ .when(tableDataManager).fetchIndexLoadingConfig();
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
assertTrue(indexDir.exists());
assertEquals(new SegmentMetadataImpl(indexDir).getTotalDocs(), 5);
// Configured dataDir for coolTier, thus move to new dir.
- tableDataManager = createTableManager();
- tableDataManager.reloadSegment(SEGMENT_NAME,
createTierIndexLoadingConfig(TIER_TABLE_CONFIG), zkMetadata,
- localMetadata, false);
+ tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
+ .when(tableDataManager).fetchIndexLoadingConfig();
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
File tierDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME,
TIER_NAME, TIER_TABLE_CONFIG);
assertTrue(tierDataDir.exists());
assertFalse(indexDir.exists());
@@ -267,8 +286,11 @@ public class BaseTableDataManagerTest {
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
- BaseTableDataManager tableDataManager = createTableManager();
- tableDataManager.reloadSegment(SEGMENT_NAME, indexLoadingConfig,
zkMetadata, localMetadata, false);
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation ->
indexLoadingConfig).when(tableDataManager).fetchIndexLoadingConfig();
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
assertTrue(indexDir.exists());
SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
@@ -295,8 +317,11 @@ public class BaseTableDataManagerTest {
.setInvertedIndexColumns(List.of(STRING_COLUMN, LONG_COLUMN)).build();
IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, SCHEMA);
- BaseTableDataManager tableDataManager = createTableManager();
- tableDataManager.reloadSegment(SEGMENT_NAME, indexLoadingConfig,
zkMetadata, localMetadata, false);
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation ->
indexLoadingConfig).when(tableDataManager).fetchIndexLoadingConfig();
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
assertTrue(indexDir.exists());
assertEquals(new SegmentMetadataImpl(indexDir).getTotalDocs(), 5);
@@ -311,22 +336,25 @@ public class BaseTableDataManagerTest {
SegmentZKMetadata zkMetadata =
makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION),
false);
-
- // Same CRC but force to download.
- BaseTableDataManager tableDataManager = createTableManager();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
assertEquals(Long.parseLong(segmentMetadata.getCrc()),
zkMetadata.getCrc());
+ // Same CRC but force to download.
+ BaseTableDataManager tableDataManager = spy(createTableManager());
+ tableDataManager.registerSegment(SEGMENT_NAME,
createImmutableSegmentDataManager(SEGMENT_NAME, segmentMetadata));
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+ doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
// Remove the local segment dir. Segment reloading fails unless force to
download.
FileUtils.deleteQuietly(indexDir);
try {
- tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(),
zkMetadata, segmentMetadata, false);
+ tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
fail();
} catch (Exception e) {
// As expected, segment reloading fails due to missing the local segment
dir.
}
- tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(),
zkMetadata, segmentMetadata, true);
+ tableDataManager.reloadSegment(SEGMENT_NAME, true, null);
assertTrue(indexDir.exists());
segmentMetadata = new SegmentMetadataImpl(indexDir);
assertEquals(Long.parseLong(segmentMetadata.getCrc()),
zkMetadata.getCrc());
@@ -415,7 +443,7 @@ public class BaseTableDataManagerTest {
tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
segmentDataManager);
// Mock the methods that will be called during segment replacement
- doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
// Use CountDownLatch to wait for async execution
@@ -458,7 +486,7 @@ public class BaseTableDataManagerTest {
tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
segmentDataManager);
// Mock the methods that will be called during segment replacement
- doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
.when(tableDataManager).fetchIndexLoadingConfig();
@@ -487,7 +515,7 @@ public class BaseTableDataManagerTest {
tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
segmentDataManager);
// Mock the methods for the second part of the test
- doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
.when(tableDataManager).fetchIndexLoadingConfig();
@@ -531,7 +559,7 @@ public class BaseTableDataManagerTest {
tableDataManager._segmentDataManagerMap.put(segmentName,
segmentDataManager);
// Mock the methods that will be called during segment replacement
- doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(segmentName);
+ seedZKMetadata(tableDataManager, segmentName, zkMetadata);
doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
// Use CountDownLatch to wait for async execution
@@ -890,32 +918,56 @@ public class BaseTableDataManagerTest {
}
}
- static OfflineTableDataManager createTableManager() {
+ protected BaseTableDataManager createTableManager() {
return createTableManager(createDefaultInstanceDataManagerConfig());
}
- static OfflineTableDataManager
createTableManagerWithAsyncSegmentRefreshEnabled() {
+ protected BaseTableDataManager
createTableManagerWithAsyncSegmentRefreshEnabled() {
return
createTableManagerWithAsyncSegmentRefreshEnabled(createDefaultInstanceDataManagerConfig());
}
- private static OfflineTableDataManager
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+ protected BaseTableDataManager createTableManager(InstanceDataManagerConfig
instanceDataManagerConfig) {
+ BaseTableDataManager tableDataManager = newTableDataManager();
+ tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
SCHEMA, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(), null, null,
SEGMENT_OPERATIONS_THROTTLER, false,
mock(ServerReloadJobStatusCache.class));
return tableDataManager;
}
- private static OfflineTableDataManager
createTableManagerWithAsyncSegmentRefreshEnabled(
+ protected BaseTableDataManager
createTableManagerWithAsyncSegmentRefreshEnabled(
InstanceDataManagerConfig instanceDataManagerConfig) {
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+ BaseTableDataManager tableDataManager = newTableDataManager();
+ tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
SCHEMA, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(), null, null,
SEGMENT_OPERATIONS_THROTTLER, true,
mock(ServerReloadJobStatusCache.class));
return tableDataManager;
}
- private static InstanceDataManagerConfig
createDefaultInstanceDataManagerConfig() {
+ /**
+ * Returns the concrete {@link BaseTableDataManager} instance under test.
Default returns a stock
+ * {@link OfflineTableDataManager}; subclasses override to test a different
implementation while inheriting
+ * all test bodies.
+ */
+ protected BaseTableDataManager newTableDataManager() {
+ return new OfflineTableDataManager();
+ }
+
+ /**
+ * Returns the {@link HelixManager} mock wired into the TDM under test.
Default returns a bare Mockito mock
+ * (no property store stubbed) — fine for the inherited test bodies which
never read ZK directly. Subclasses
+ * that exercise paths reading {@code _propertyStore} (e.g. {@code
fetchZKMetadata}, {@code fetchIndexLoadingConfig})
+ * override to stub {@code helixManager.getHelixPropertyStore()} with a
{@code FakePropertyStore} pre-seeded
+ * with table config + schema + per-segment ZK metadata.
+ */
+ protected HelixManager createHelixManagerMock() {
+ return mock(HelixManager.class);
+ }
+
+ protected void seedZKMetadata(BaseTableDataManager spy, String segmentName,
SegmentZKMetadata zkMetadata) {
+ doAnswer(invocation -> zkMetadata).when(spy).fetchZKMetadata(segmentName);
+ }
+
+ protected static InstanceDataManagerConfig
createDefaultInstanceDataManagerConfig() {
InstanceDataManagerConfig config = mock(InstanceDataManagerConfig.class);
when(config.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
// Check CRC matching on segment load time.
@@ -923,7 +975,7 @@ public class BaseTableDataManagerTest {
return config;
}
- private static File createSegment(SegmentVersion segmentVersion, int numRows)
+ protected static File createSegment(SegmentVersion segmentVersion, int
numRows)
throws Exception {
SegmentGeneratorConfig config = new
SegmentGeneratorConfig(DEFAULT_TABLE_CONFIG, SCHEMA);
config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
@@ -942,14 +994,14 @@ public class BaseTableDataManagerTest {
return new File(TABLE_DATA_DIR, SEGMENT_NAME);
}
- private static SegmentZKMetadata createRawSegment(SegmentVersion
segmentVersion, int numRows)
+ protected static SegmentZKMetadata createRawSegment(SegmentVersion
segmentVersion, int numRows)
throws Exception {
File indexDir = createSegment(segmentVersion, numRows);
return makeRawSegment(indexDir,
new File(TEMP_DIR, SEGMENT_NAME +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
}
- private static SegmentZKMetadata makeRawSegment(File indexDir, File
rawSegmentFile, boolean deleteIndexDir)
+ protected static SegmentZKMetadata makeRawSegment(File indexDir, File
rawSegmentFile, boolean deleteIndexDir)
throws Exception {
long crc = getCRC(indexDir);
SegmentZKMetadata zkMetadata = new SegmentZKMetadata(SEGMENT_NAME);
@@ -962,7 +1014,7 @@ public class BaseTableDataManagerTest {
return zkMetadata;
}
- private static long getCRC(File indexDir)
+ protected static long getCRC(File indexDir)
throws IOException {
File creationMetaFile =
SegmentDirectoryPaths.findCreationMetaFile(indexDir);
assertNotNull(creationMetaFile);
@@ -971,7 +1023,7 @@ public class BaseTableDataManagerTest {
}
}
- private IndexLoadingConfig createTierIndexLoadingConfig(TableConfig
tableConfig) {
+ protected IndexLoadingConfig createTierIndexLoadingConfig(TableConfig
tableConfig) {
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getSegmentDirectoryLoader()).thenReturn(TIER_SEGMENT_DIRECTORY_LOADER);
when(instanceDataManagerConfig.getConfig()).thenReturn(new
PinotConfiguration());
@@ -981,14 +1033,19 @@ public class BaseTableDataManagerTest {
return indexLoadingConfig;
}
- private ImmutableSegmentDataManager createImmutableSegmentDataManager(String
segmentName, long crc) {
+ protected ImmutableSegmentDataManager
createImmutableSegmentDataManager(String segmentName, long crc) {
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(segmentMetadata.getCrc()).thenReturn(Long.toString(crc));
+ return createImmutableSegmentDataManager(segmentName, segmentMetadata);
+ }
+
+ protected ImmutableSegmentDataManager
createImmutableSegmentDataManager(String segmentName,
+ SegmentMetadata segmentMetadata) {
ImmutableSegmentDataManager segmentDataManager =
mock(ImmutableSegmentDataManager.class);
when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
- SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
- when(segmentMetadata.getCrc()).thenReturn(Long.toString(crc));
return segmentDataManager;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
index 2edd3c134a2..e8f5796d73c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
@@ -99,8 +99,6 @@ public abstract class SegmentDataManager {
* The data manager can only be destroyed once.
*/
public void destroy() {
- // NOTE: We want the test to catch the case when destroy is called without
offloading, but not fail the production.
- assert _offloaded.get() : "Cannot destroy segment data manager without
offloading it first";
offload();
if (_destroyed.compareAndSet(false, true)) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 15a8a7565e6..cc12bfcec7a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -194,6 +194,13 @@ public interface TableDataManager {
void offloadSegmentUnsafe(String segmentName)
throws Exception;
+ /**
+ * Deletes a segment from this table — offloads it if currently loaded, then
removes its on-disk data (the per-segment
+ * directory and any tier-specific artefacts).
+ */
+ void deleteSegment(String segmentName)
+ throws Exception;
+
/**
* Reloads an existing immutable segment for the table, which can be an
OFFLINE or REALTIME table.
* A new segment may be downloaded if the local one has a different CRC; or
can be forced to download if
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index ebe224f73d9..4c409d211b4 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -48,6 +48,7 @@ import
org.apache.pinot.common.config.provider.LogicalTableMetadataCache;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.LogicalTableContext;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
@@ -64,9 +65,6 @@ import
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -389,38 +387,23 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
public void deleteSegment(String tableNameWithType, String segmentName)
throws Exception {
LOGGER.info("Deleting segment: {} from table: {}", segmentName,
tableNameWithType);
- // Segment deletion is handled at instance level because table data
manager might not exist. Acquire the lock here.
+ // Hold the per-segment lock around the TDM lookup so the lookup + delete
is atomic vs. removeTableDataManager
+ // shutting the TDM down concurrently. The TDM's own deleteSegment
re-acquires the same lock (ReentrantLock).
Lock segmentLock = _segmentLocks.getLock(tableNameWithType, segmentName);
segmentLock.lock();
try {
- // Check if the segment is still loaded, if so, offload it first.
- // This might happen when the server disconnected from ZK and
reconnected, and the segment is still loaded.
- // TODO: Consider using table data manager to delete the segment. This
will allow the table data manager to clean
- // up the segment data on all tiers. Note that table data manager
might have not been created, and table
- // config might have been deleted at this point.
TableDataManager tableDataManager =
_tableDataManagerMap.get(tableNameWithType);
- if (tableDataManager != null &&
tableDataManager.hasSegment(segmentName)) {
- LOGGER.warn("Segment: {} from table: {} is still loaded, offloading it
first", segmentName, tableNameWithType);
- tableDataManager.offloadSegment(segmentName);
- }
- // Clean up the segment data on default tier unconditionally.
- File segmentDir = getSegmentDataDirectory(tableNameWithType,
segmentName);
- if (segmentDir.exists()) {
- FileUtils.deleteQuietly(segmentDir);
- LOGGER.info("Deleted segment directory {} on default tier",
segmentDir);
- }
- // We might clean up further more with the specific segment loader. But
note that table data manager might have
- // not been created, and table config might have been deleted at this
point.
- SegmentDirectoryLoader segmentLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(
- _instanceDataManagerConfig.getSegmentDirectoryLoader());
- if (segmentLoader != null) {
- LOGGER.info("Deleting segment: {} further with segment loader: {}",
segmentName,
- _instanceDataManagerConfig.getSegmentDirectoryLoader());
- SegmentDirectoryLoaderContext ctx = new
SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
- .setTableDataDir(_instanceDataManagerConfig.getInstanceDataDir() +
"/" + tableNameWithType).build();
- segmentLoader.delete(ctx);
+ if (tableDataManager != null) {
+ // The TDM owns the offload-if-loaded prelude, the on-disk dir delete,
and the tier-aware
+ // segment-directory-loader cleanup.
+ tableDataManager.deleteSegment(segmentName);
+ } else {
+ // Fallback: TDM can be null if it was never instantiated, or has
already been removed via deleteTable.
+ // In that case, do a path-only cleanup keyed by segment name.
+ String tableDataDir = _instanceDataManagerConfig.getInstanceDataDir()
+ "/" + tableNameWithType;
+ BaseTableDataManager.deleteSegmentFilesFromDisk(tableDataDir,
segmentName, _instanceDataManagerConfig);
+ LOGGER.info("Deleted segment: {} from table: {}", segmentName,
tableNameWithType);
}
- LOGGER.info("Deleted segment: {} from table: {}", segmentName,
tableNameWithType);
} finally {
segmentLock.unlock();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]