This is an automated email from the ASF dual-hosted git repository.

noob-se7en pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e76da0e6870 Refactor BaseTableDataManager  (#18381)
e76da0e6870 is described below

commit e76da0e6870f0f0486f5e5c37ad6494f076995e0
Author: Krishan Goyal <[email protected]>
AuthorDate: Tue May 19 17:36:58 2026 +0530

    Refactor BaseTableDataManager  (#18381)
    
    * Refactor BaseTableDataManager so multi-segment managers can compose its 
building blocks
    
    Three small refactors that keep single-segment behavior identical and let a
    future multi-segment SegmentDataManager (e.g. a wrapper around N constituent
    segments) reuse the same load and reload primitives without forking them:
    
    1. Extract `protected ImmutableSegment loadSegment(zkMetadata, ilc)` from
       `downloadAndLoadSegment`. The new helper performs only the download +
       `ImmutableSegmentLoader.load`, returning the segment without registering 
it
       in `_segmentDataManagerMap` or invoking upsert hooks. Single-segment 
callers
       continue to use `downloadAndLoadSegment`, which now composes the helper +
       `addSegment(...)`. This lets a multi-segment manager load all of its 
members
       first and register a single wrapper entry under one name.
    
    2. Push `_segmentReloadSemaphore` acquire/release down into
       `reloadSegment(SegmentDataManager, IndexLoadingConfig, boolean)`. The 
public
       `reloadSegment(String)` and the private parallel 
`reloadSegments(List<SDM>)`
       both used to wrap the inner call with the semaphore; that acquire is now
       inside the per-physical-segment body and the outer wrappers are removed
       (which would otherwise double-acquire on a non-reentrant semaphore). For
       non-group tables this is structurally identical (one segment -> one 
acquire
       -> one release; same concurrency bound). For multi-segment managers that
       fan out N reloads, each member contends for a slot independently.
    
    3. Drop `@VisibleForTesting` on `isSegmentStale(IndexLoadingConfig,
       SegmentDataManager)` and widen to plain `protected` so subclasses can 
call
       it from group-aware overrides of `getStaleSegments` / 
`needReloadSegments`.
    
    The semaphore stays at the orchestration boundary in `doReplaceSegment` (not
    relocated into `replaceSegmentIfCrcMismatch`), because subclasses commonly
    override `replaceSegmentIfCrcMismatch` and a relocation there would leak the
    acquire across paths that bypass the override; subclasses needing per-member
    acquire on a multi-segment replace can wrap the call themselves.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Cleanup comment
    
    * Refactor BaseTDM and its tests to enable cleaner extensions, abstractions 
and ownership for future work
    
    * Cleanup deleteSegment
    
    * Move offloadSegment to deleteSegment
    
    * Reload should take lock first and then acquire semaphore
    
    * Cleanup BaseTDM
    
    * Acquire then take segment lock in reload
    
    * Acquire reload semaphore for realtime force commit too
    
    * Make some accessors public in TDM
    
    * Make some accessors public in TDM
    
    * Avoid sharing _segmentDataManagerMap publicly
    
    * Reload Test case fixes
    
    * Add createHelixManagerMock
    
    * Rename to tryLoadExistingSegmentWithoutRegistering
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../core/data/manager/BaseTableDataManager.java    | 151 +++++++++++++++++----
 .../BaseTableDataManagerAcquireSegmentTest.java    |  98 ++++++++-----
 .../BaseTableDataManagerNeedRefreshTest.java       | 103 +++++++-------
 .../data/manager/BaseTableDataManagerTest.java     | 151 ++++++++++++++-------
 .../local/data/manager/SegmentDataManager.java     |   2 -
 .../local/data/manager/TableDataManager.java       |   7 +
 .../starter/helix/HelixInstanceDataManager.java    |  43 ++----
 7 files changed, 369 insertions(+), 186 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 2bd134a9e52..e0b2e76980b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -606,6 +607,56 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     }
   }
 
+  @Override
+  public void deleteSegment(String segmentName)
+      throws Exception {
+    _logger.info("Deleting segment: {}", segmentName);
+    Lock segmentLock = getSegmentLock(segmentName);
+    segmentLock.lock();
+    try {
+      if (hasSegment(segmentName)) {
+        _logger.warn("Segment: {} is still loaded, offloading it before 
delete", segmentName);
+        offloadSegment(segmentName);
+      }
+      doDeleteSegment(segmentName);
+    } catch (Exception e) {
+      addSegmentError(segmentName,
+          new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception 
while deleting segment", e));
+      throw e;
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  protected void doDeleteSegment(String segmentName)
+      throws Exception {
+    deleteSegmentFilesFromDisk(_tableDataDir, segmentName, 
_instanceDataManagerConfig);
+    _logger.info("Deleted segment: {}", segmentName);
+  }
+
+  /**
+   * Removes the segment directory locally and does tier-aware cleanup too
+   */
+  public static void deleteSegmentFilesFromDisk(String tableDataDir, String 
segmentName,
+      InstanceDataManagerConfig instanceConfig)
+      throws Exception {
+    File segmentDir = new File(tableDataDir, segmentName);
+    if (segmentDir.exists()) {
+      FileUtils.deleteQuietly(segmentDir);
+      LOGGER.info("Deleted segment directory {} on default tier", segmentDir);
+    }
+    SegmentDirectoryLoader segmentLoader =
+        
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(instanceConfig.getSegmentDirectoryLoader());
+    if (segmentLoader != null) {
+      LOGGER.info("Deleting segment: {} further with segment loader: {}", 
segmentName,
+          instanceConfig.getSegmentDirectoryLoader());
+      SegmentDirectoryLoaderContext ctx = new 
SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
+          .setTableDataDir(tableDataDir)
+          .build();
+      segmentLoader.delete(ctx);
+    }
+  }
+
   @Override
   public void reloadSegment(String segmentName, boolean forceDownload, String 
reloadJobId)
       throws Exception {
@@ -616,12 +667,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager != null) {
       IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
-      _segmentReloadSemaphore.acquire(segmentName, _logger);
-      try {
-        reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
-      } finally {
-        _segmentReloadSemaphore.release();
-      }
+      reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
     } else {
       _logger.warn("Failed to find segment: {}, skipping reloading it", 
segmentName);
     }
@@ -771,6 +817,36 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return _segmentPreloadExecutor;
   }
 
+  public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
+    return _propertyStore;
+  }
+
+  @Nullable
+  public SegmentDataManager getSegmentDataManager(String segmentName) {
+    return _segmentDataManagerMap.get(segmentName);
+  }
+
+  public Collection<SegmentDataManager> getAllSegmentDataManagers() {
+    return Collections.unmodifiableCollection(_segmentDataManagerMap.values());
+  }
+
+  public Logger getLogger() {
+    return _logger;
+  }
+
+  public SegmentReloadSemaphore getSegmentReloadSemaphore() {
+    return _segmentReloadSemaphore;
+  }
+
+  @Nullable
+  public SegmentOperationsThrottlerSet getSegmentOperationsThrottlerSet() {
+    return _segmentOperationsThrottlerSet;
+  }
+
+  public ServerMetrics getServerMetrics() {
+    return _serverMetrics;
+  }
+
   @Override
   public boolean isDeleted() {
     return _isDeleted;
@@ -947,12 +1023,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     
CompletableFuture.allOf(segmentDataManagers.stream().map(segmentDataManager -> 
CompletableFuture.runAsync(() -> {
       String segmentName = segmentDataManager.getSegmentName();
       try {
-        _segmentReloadSemaphore.acquire(segmentName, _logger);
-        try {
-          reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
-        } finally {
-          _segmentReloadSemaphore.release();
-        }
+        reloadSegment(segmentDataManager, indexLoadingConfig, forceDownload);
       } catch (Throwable t) {
         _logger.error("Caught exception while reloading segment: {}", 
segmentName, t);
         failedSegments.add(segmentName);
@@ -994,7 +1065,12 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
               + "Change the cluster config: {} to `PROTECTED` for safer 
commit", segmentName, config.getConfigKey());
         } else {
           _logger.info("Reloading (force committing) consuming segment: {}", 
segmentName);
-          ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
+          _segmentReloadSemaphore.acquire(segmentName, _logger);
+          try {
+            ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
+          } finally {
+            _segmentReloadSemaphore.release();
+          }
         }
       } else {
         _logger.warn("Skip reloading consuming segment: {} as configured", 
segmentName);
@@ -1014,9 +1090,17 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     indexLoadingConfig.setSegmentTier(segmentTier);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
     File indexDir = getSegmentDataDir(segmentName, segmentTier, 
indexLoadingConfig.getTableConfig());
+    _segmentReloadSemaphore.acquire(segmentName, _logger);
     Lock segmentLock = getSegmentLock(segmentName);
     segmentLock.lock();
     try {
+      // Re-validate under the segment lock: the caller's map read in 
reloadSegment(String, boolean, String) is
+      // unlocked, so a concurrent offloadSegment can remove the entry between 
that read and this lock acquisition.
+      // Without this guard, reload would resurrect a segment the cluster has 
already dropped.
+      if (_segmentDataManagerMap.get(segmentName) == null) {
+        _logger.warn("Skipping reload for segment: {} — concurrently offloaded 
after dispatch", segmentName);
+        return;
+      }
       /*
       Determines if a segment should be downloaded from deep storage based on:
       1. A forced download flag.
@@ -1092,6 +1176,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       throw reloadFailureException;
     } finally {
       segmentLock.unlock();
+      _segmentReloadSemaphore.release();
     }
     _logger.info("Reloaded segment: {}", segmentName);
   }
@@ -1116,7 +1201,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    * existing segment with the same name.
    */
   @Nullable
-  protected SegmentDataManager registerSegment(String segmentName, 
SegmentDataManager segmentDataManager) {
+  public SegmentDataManager registerSegment(String segmentName, 
SegmentDataManager segmentDataManager) {
     SegmentDataManager oldSegmentDataManager;
     synchronized (_segmentDataManagerMap) {
       oldSegmentDataManager = _segmentDataManagerMap.put(segmentName, 
segmentDataManager);
@@ -1133,7 +1218,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    * identify this scenario.
    */
   @Nullable
-  protected SegmentDataManager unregisterSegment(String segmentName) {
+  public SegmentDataManager unregisterSegment(String segmentName) {
     _recentlyDeletedSegments.put(segmentName, segmentName);
     synchronized (_segmentDataManagerMap) {
       return _segmentDataManagerMap.remove(segmentName);
@@ -1145,7 +1230,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    * Segment can be downloaded from deep store or from peer servers. 
Downloaded segment might be compressed or
    * encrypted, and this method takes care of decompressing and decrypting the 
segment.
    */
-  protected File downloadSegment(SegmentZKMetadata zkMetadata)
+  public File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
     String segmentName = zkMetadata.getSegmentName();
     String downloadUrl = zkMetadata.getDownloadUrl();
@@ -1376,7 +1461,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    * The original index directory is restored lazily, as depending on the 
conditions,
    * it may be restored from the backup directory or segment downloaded from 
deep store.
    */
-  protected void createBackup(File indexDir) {
+  public void createBackup(File indexDir) {
     if (!indexDir.exists()) {
       return;
     }
@@ -1393,7 +1478,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    * When we rename the segment backup directory to segment temporary 
directory, we know the reload
    * already succeeded, so that we can safely delete the segment temporary 
directory.
    */
-  protected void removeBackup(File indexDir)
+  public void removeBackup(File indexDir)
       throws IOException {
     File parentDir = indexDir.getParentFile();
     File segmentBackupDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
@@ -1410,6 +1495,22 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   @Override
   public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig) {
+    ImmutableSegment segment = 
tryLoadExistingSegmentWithoutRegistering(zkMetadata, indexLoadingConfig);
+    if (segment == null) {
+      return false;
+    }
+    addSegment(segment, zkMetadata);
+    return true;
+  }
+
+  /**
+   * Just Loads a segment from the existing on-disk copy without registering 
it in {@code _segmentDataManagerMap} or
+   * invoking other hooks.
+   * Returns {@code null} when the on-disk copy is absent, has a stale CRC 
under or fails to load
+   */
+  @Nullable
+  public ImmutableSegment 
tryLoadExistingSegmentWithoutRegistering(SegmentZKMetadata zkMetadata,
+      IndexLoadingConfig indexLoadingConfig) {
     String segmentName = zkMetadata.getSegmentName();
     Preconditions.checkState(!_shutDown,
         "Table data manager is already shut down, cannot load existing 
segment: %s of table: %s", segmentName,
@@ -1442,14 +1543,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     if (segmentMetadata == null) {
       _logger.info("Segment: {} does not exist", segmentName);
       closeSegmentDirectoryQuietly(segmentDirectory);
-      return false;
+      return null;
     }
     if (isSegmentStatusCompleted(zkMetadata) && !hasSameCRC(zkMetadata, 
segmentMetadata)) {
       _logger.warn("Segment: {} has CRC changed from: {} to: {}", segmentName, 
segmentMetadata.getCrc(),
           zkMetadata.getCrc());
       if (_instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad()) {
         closeSegmentDirectoryQuietly(segmentDirectory);
-        return false;
+        return null;
       }
       _logger.info("Skipping CRC check for segment: {} as configured. Proceed 
to load segment.", segmentName);
     }
@@ -1470,15 +1571,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
             indexLoadingConfig, zkMetadata);
       }
       ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig);
-      addSegment(segment, zkMetadata);
       _logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", 
segmentName, zkMetadata.getCrc(),
           TierConfigUtils.normalizeTierName(segmentTier));
-      return true;
+      return segment;
     } catch (Exception e) {
       _logger.error("Failed to load existing segment: {} with CRC: {} on tier: 
{}", segmentName, zkMetadata.getCrc(),
           TierConfigUtils.normalizeTierName(segmentTier), e);
       closeSegmentDirectoryQuietly(segmentDirectory);
-      return false;
+      return null;
     }
   }
 
@@ -1542,8 +1642,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return staleSegments;
   }
 
-  @VisibleForTesting
-  StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, 
SegmentDataManager segmentDataManager) {
+  public StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, 
SegmentDataManager segmentDataManager) {
     TableConfig tableConfig = indexLoadingConfig.getTableConfig();
     Schema schema = indexLoadingConfig.getSchema();
     assert tableConfig != null && schema != null;
@@ -1847,7 +1946,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   // CRC check can be performed on both segment CRC and data CRC (if 
available) based on the ZK property value of
   // useDataCRC.
-  protected static boolean hasSameCRC(SegmentZKMetadata zkMetadata, 
SegmentMetadata localMetadata) {
+  public static boolean hasSameCRC(SegmentZKMetadata zkMetadata, 
SegmentMetadata localMetadata) {
     if (zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc())) {
       return true;
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 519a3e8ff9f..a6d48b56e06 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -31,7 +31,6 @@ import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -73,7 +72,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
   private final Set<ImmutableSegment> _allSegments = new HashSet<>();
   private final Set<SegmentDataManager> _accessedSegManagers = 
ConcurrentHashMap.newKeySet();
   private final Set<SegmentDataManager> _allSegManagers = 
ConcurrentHashMap.newKeySet();
-  private Map<String, ImmutableSegmentDataManager> _internalSegMap;
+  protected Map<String, SegmentDataManager> _internalSegMap;
   private Throwable _exception;
   private Thread _masterThread;
   // Segment numbers in place.
@@ -115,7 +114,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
     _masterThread = null;
   }
 
-  private TableDataManager makeTestableManager()
+  protected TableDataManager makeTestableManager()
       throws Exception {
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(_tmpDir.getAbsolutePath());
@@ -129,18 +128,51 @@ public class BaseTableDataManagerAcquireSegmentTest {
             new SegmentOperationsThrottler(4, 8, true),
             new SegmentOperationsThrottler(10, 20, true),
             new SegmentOperationsThrottler(4, 8, true));
-    TableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig, schema,
+    TableDataManager tableDataManager = newTableDataManager();
+    tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), 
new SegmentLocks(), tableConfig, schema,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, segmentOperationsThrottlerSet,
         false, mock(ServerReloadJobStatusCache.class));
     tableDataManager.start();
     Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
-    _internalSegMap = (Map<String, ImmutableSegmentDataManager>) 
segsMapField.get(tableDataManager);
+    _internalSegMap = (Map<String, SegmentDataManager>) 
segsMapField.get(tableDataManager);
     return tableDataManager;
   }
 
-  private ImmutableSegment makeImmutableSegment(String segmentName, int 
totalDocs) {
+  /**
+   * Returns the concrete {@link TableDataManager} instance under test. 
Default returns a stock
+   * {@link OfflineTableDataManager}; subclasses override to test a different 
implementation while inheriting all
+   * test bodies.
+   */
+  protected TableDataManager newTableDataManager() {
+    return new OfflineTableDataManager();
+  }
+
+  /**
+   * Returns the {@link HelixManager} mock wired into the TDM under test. 
Default returns a bare Mockito mock
+   * — fine for the inherited test bodies which do not read cluster config or 
property store directly.
+   * Subclasses that exercise paths reading those override to stub them.
+   */
+  protected HelixManager createHelixManagerMock() {
+    return mock(HelixManager.class);
+  }
+
+  /**
+   * Returns the TDM-map key under which a segment with the given name is 
registered.
+   */
+  protected String tdmKey(String segmentName) {
+    return segmentName;
+  }
+
+  protected void addSegment(TableDataManager tdm, ImmutableSegment seg) {
+    tdm.addSegment(seg);
+  }
+
+  protected ImmutableSegment getInnerSegment(SegmentDataManager sdm) {
+    return (ImmutableSegment) sdm.getSegment();
+  }
+
+  protected ImmutableSegment makeImmutableSegment(String segmentName, int 
totalDocs) {
     ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
     SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
     when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
@@ -164,11 +196,11 @@ public class BaseTableDataManagerAcquireSegmentTest {
     // Add the segment, get it for use, remove the segment, and then return it.
     // Make sure that the segment is not destroyed before return.
     ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, 
totalDocs);
-    tableDataManager.addSegment(immutableSegment);
+    addSegment(tableDataManager, immutableSegment);
     Assert.assertEquals(tableDataManager.getNumSegments(), 1);
-    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(tdmKey(segmentName));
     Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
-    tableDataManager.offloadSegment(segmentName);
+    tableDataManager.offloadSegment(tdmKey(segmentName));
     Assert.assertEquals(tableDataManager.getNumSegments(), 0);
     Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
     Assert.assertEquals(_nDestroys, 0);
@@ -177,7 +209,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
     Assert.assertEquals(_nDestroys, 1);
 
     // Now the segment should not be available for use.Also, returning a null 
reader is fine
-    segmentDataManager = tableDataManager.acquireSegment(segmentName);
+    segmentDataManager = tableDataManager.acquireSegment(tdmKey(segmentName));
     Assert.assertNull(segmentDataManager);
     List<SegmentDataManager> segmentDataManagers = 
tableDataManager.acquireAllSegments();
     Assert.assertEquals(segmentDataManagers.size(), 0);
@@ -185,26 +217,26 @@ public class BaseTableDataManagerAcquireSegmentTest {
     // If a caller tries to acquire the deleted segment using acquireSegments, 
it will be returned in
     // notAcquiredSegments. The isSegmentDeletedRecently method should return 
true.
     List<String> notAcquiredSegments = new ArrayList<>();
-    tableDataManager.acquireSegments(List.of(segmentName), 
notAcquiredSegments);
+    tableDataManager.acquireSegments(List.of(tdmKey(segmentName)), 
notAcquiredSegments);
     Assert.assertEquals(notAcquiredSegments.size(), 1);
-    Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(segmentName));
+    
Assert.assertTrue(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName)));
 
     // Adding and removing the segment again is fine. After adding the segment 
back, isSegmentDeletedRecently should
     // return false.
-    tableDataManager.addSegment(immutableSegment);
-    Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(segmentName));
-    tableDataManager.offloadSegment(segmentName);
+    addSegment(tableDataManager, immutableSegment);
+    
Assert.assertFalse(tableDataManager.isSegmentDeletedRecently(tdmKey(segmentName)));
+    tableDataManager.offloadSegment(tdmKey(segmentName));
 
     // Removing the segment again is fine.
-    tableDataManager.offloadSegment(segmentName);
+    tableDataManager.offloadSegment(tdmKey(segmentName));
     Assert.assertEquals(tableDataManager.getNumSegments(), 0);
 
     // Add a new segment and remove it in order this time.
     final String anotherSeg = "AnotherSegment";
     ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
-    tableDataManager.addSegment(ix1);
+    addSegment(tableDataManager, ix1);
     Assert.assertEquals(tableDataManager.getNumSegments(), 1);
-    SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
+    SegmentDataManager sdm1 = 
tableDataManager.acquireSegment(tdmKey(anotherSeg));
     Assert.assertNotNull(sdm1);
     Assert.assertEquals(sdm1.getReferenceCount(), 2);
     // acquire all segments
@@ -220,15 +252,15 @@ public class BaseTableDataManagerAcquireSegmentTest {
     Assert.assertEquals(sdm1.getReferenceCount(), 1);
     // Now replace the segment with another one.
     ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
-    tableDataManager.addSegment(ix2);
+    addSegment(tableDataManager, ix2);
     Assert.assertEquals(tableDataManager.getNumSegments(), 1);
     // Now the previous one should have been destroyed, and
     Assert.assertEquals(sdm1.getReferenceCount(), 0);
     verify(ix1, times(1)).destroy();
     // Delete ix2 without accessing it.
-    SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg);
+    SegmentDataManager sdm2 = _internalSegMap.get(tdmKey(anotherSeg));
     Assert.assertEquals(sdm2.getReferenceCount(), 1);
-    tableDataManager.offloadSegment(anotherSeg);
+    tableDataManager.offloadSegment(tdmKey(anotherSeg));
     Assert.assertEquals(tableDataManager.getNumSegments(), 0);
     Assert.assertEquals(sdm2.getReferenceCount(), 0);
     verify(ix2, times(1)).destroy();
@@ -265,8 +297,8 @@ public class BaseTableDataManagerAcquireSegmentTest {
 
     for (int i = _lo; i <= _hi; i++) {
       final String segName = SEGMENT_PREFIX + i;
-      tableDataManager.addSegment(makeImmutableSegment(segName, 
_random.nextInt()));
-      _allSegManagers.add(_internalSegMap.get(segName));
+      addSegment(tableDataManager, makeImmutableSegment(segName, 
_random.nextInt()));
+      _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
     }
 
     runStorageServer(numQueryThreads, runTimeSec, tableDataManager);  // 
replaces segments while online
@@ -315,14 +347,14 @@ public class BaseTableDataManagerAcquireSegmentTest {
     for (SegmentDataManager segmentDataManager : _internalSegMap.values()) {
       Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
       // We should never have called destroy on these segments. Remove it from 
the list of accessed segments.
-      verify(segmentDataManager.getSegment(), never()).destroy();
+      verify(getInnerSegment(segmentDataManager), never()).destroy();
       _allSegManagers.remove(segmentDataManager);
       _accessedSegManagers.remove(segmentDataManager);
     }
 
     // For the remaining segments in accessed list, destroy must have been 
called exactly once.
     for (SegmentDataManager segmentDataManager : _allSegManagers) {
-      verify(segmentDataManager.getSegment(), times(1)).destroy();
+      verify(getInnerSegment(segmentDataManager), times(1)).destroy();
       // Also their count should be 0
       Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
     }
@@ -361,7 +393,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
             Set<Integer> segmentIds = pickSegments();
             List<String> segmentList = new ArrayList<>(segmentIds.size());
             for (Integer segmentId : segmentIds) {
-              segmentList.add(SEGMENT_PREFIX + segmentId);
+              segmentList.add(tdmKey(SEGMENT_PREFIX + segmentId));
             }
             segmentDataManagers = 
_tableDataManager.acquireSegments(segmentList, new ArrayList<>());
           }
@@ -448,8 +480,9 @@ public class BaseTableDataManagerAcquireSegmentTest {
     private void addSegment() {
       final int segmentToAdd = _hi + 1;
       final String segName = SEGMENT_PREFIX + segmentToAdd;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, 
_random.nextInt()));
-      _allSegManagers.add(_internalSegMap.get(segName));
+      BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager,
+          makeImmutableSegment(segName, _random.nextInt()));
+      _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
       _hi = segmentToAdd;
     }
 
@@ -457,8 +490,9 @@ public class BaseTableDataManagerAcquireSegmentTest {
     private void replaceSegment() {
       int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
       final String segName = SEGMENT_PREFIX + segToReplace;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, 
_random.nextInt()));
-      _allSegManagers.add(_internalSegMap.get(segName));
+      BaseTableDataManagerAcquireSegmentTest.this.addSegment(_tableDataManager,
+          makeImmutableSegment(segName, _random.nextInt()));
+      _allSegManagers.add(_internalSegMap.get(tdmKey(segName)));
     }
 
     // Remove the segment _lo and then bump _lo
@@ -466,7 +500,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
         throws Exception {
       // Keep at least one segment in place.
       if (_hi > _lo) {
-        _tableDataManager.offloadSegment(SEGMENT_PREFIX + _lo);
+        _tableDataManager.offloadSegment(tdmKey(SEGMENT_PREFIX + _lo));
         _lo++;
       } else {
         addSegment();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
index 61657b8e5fd..c1ef555a3af 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -52,6 +52,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -64,7 +65,7 @@ import static org.testng.Assert.assertTrue;
 
 
 @Test
-public class BaseTableDataManagerNeedRefreshTest {
+public class BaseTableDataManagerNeedRefreshTest extends 
BaseTableDataManagerTest {
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"BaseTableDataManagerNeedRefreshTest");
   private static final String DEFAULT_TABLE_NAME = "mytable";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
@@ -91,7 +92,7 @@ public class BaseTableDataManagerNeedRefreshTest {
   private static final Schema SCHEMA;
   private static final IndexLoadingConfig INDEX_LOADING_CONFIG;
   private static final ImmutableSegmentDataManager 
IMMUTABLE_SEGMENT_DATA_MANAGER;
-  private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
+  private BaseTableDataManager _baseTableDataManager;
 
   private String _testName = "defaultTestName";
 
@@ -102,12 +103,16 @@ public class BaseTableDataManagerNeedRefreshTest {
       INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
       IMMUTABLE_SEGMENT_DATA_MANAGER =
           createImmutableSegmentDataManager(INDEX_LOADING_CONFIG, 
"basicSegment", generateRows());
-      BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  @BeforeClass
+  public void initBaseTableDataManager() {
+    _baseTableDataManager = createTableManager();
+  }
+
   protected static TableConfigBuilder getTableConfigBuilder() {
     return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
         .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
@@ -198,7 +203,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
     File indexDir = createSegment(indexLoadingConfig, segmentName, rows);
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig,
-        BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER);
+        SEGMENT_OPERATIONS_THROTTLER);
     when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
     return segmentDataManager;
   }
@@ -233,7 +238,7 @@ public class BaseTableDataManagerNeedRefreshTest {
 
     ImmutableSegmentDataManager segmentDataManager =
         createImmutableSegmentDataManager(indexLoadingConfig, "noChanges", 
List.of(row));
-    BaseTableDataManager tableDataManager = 
BaseTableDataManagerTest.createTableManager();
+    BaseTableDataManager tableDataManager = createTableManager();
 
     StaleSegment response = 
tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager);
     assertFalse(response.isStale());
@@ -247,7 +252,7 @@ public class BaseTableDataManagerNeedRefreshTest {
   @Test
   void testChangeTimeColumn() {
     TableConfig tableConfig = 
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build();
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
+    StaleSegment response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "time column");
@@ -258,7 +263,7 @@ public class BaseTableDataManagerNeedRefreshTest {
       throws Exception {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+    StaleSegment response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "column deleted: textColumn");
@@ -270,7 +275,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
     schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, true));
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+    StaleSegment response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "field type changed: textColumn");
@@ -282,7 +287,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
     schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.INT, true));
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+    StaleSegment response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "data type changed: textColumn");
@@ -294,7 +299,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
     schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, false));
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+    StaleSegment response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "single / multi value changed: 
textColumn");
@@ -306,7 +311,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN_MV);
     schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, 
FieldSpec.DataType.STRING, true));
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+    StaleSegment response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "single / multi value changed: 
textColumnMV");
@@ -317,12 +322,12 @@ public class BaseTableDataManagerNeedRefreshTest {
     // Check with a column that is not sorted
     TableConfig tableConfig = 
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build();
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "sort column changed: 
MilliSecondsSinceEpoch");
     // Check with a column that is sorted
     
tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN));
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+    assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
   }
 
   @DataProvider(name = "testFilterArgs")
@@ -357,17 +362,17 @@ public class BaseTableDataManagerNeedRefreshTest {
         createImmutableSegmentDataManager(indexLoadingConfig, segmentName, 
generateRows());
 
     // When TableConfig has a filter but segment does not have, needRefresh is 
true.
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), expectedReason);
 
     // When TableConfig does not have a filter but segment has, needRefresh is 
true
-    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithFilter);
+    response = _baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithFilter);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), expectedReason);
 
     // When TableConfig has a filter AND segment also has a filter, 
needRefresh is false
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
segmentWithFilter).isStale());
+    assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
segmentWithFilter).isStale());
   }
 
   @Test
@@ -380,17 +385,17 @@ public class BaseTableDataManagerNeedRefreshTest {
         createImmutableSegmentDataManager(indexLoadingConfig, 
"partitionWithModulo", generateRows());
 
     // when segment has no partition AND tableConfig has partitions then 
needRefresh = true
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "partition function added: 
partitionedColumn");
 
     // when segment has partitions AND tableConfig has no partitions, then 
needRefresh = false
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithPartition).isStale());
+    assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithPartition).isStale());
 
     // when # of partitions is different, then needRefresh = true
     TableConfig partitionedTableConfig40 = 
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
         Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
-    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
+    response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
         segmentWithPartition);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "num partitions changed: 
partitionedColumn");
@@ -399,7 +404,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig partitionedTableConfigMurmur = 
getTableConfigBuilder().setSegmentPartitionConfig(
         new SegmentPartitionConfig(
             Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
-    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
+    response = _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
         segmentWithPartition);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "partition function name changed: 
partitionedColumn");
@@ -414,12 +419,12 @@ public class BaseTableDataManagerNeedRefreshTest {
         createImmutableSegmentDataManager(indexLoadingConfig, 
"withoutNullHandling", generateRows());
 
     // If null handling is removed from table config AND segment has NVV, then 
NVV can be removed. needRefresh = true
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "null value vector index removed from 
column: NullValueColumn");
 
     // if NVV is added to table config AND segment does not have NVV, then it 
cannot be added. needRefresh = false
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithoutNullHandling).isStale());
+    assertFalse(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithoutNullHandling).isStale());
   }
 
   @Test
@@ -428,7 +433,7 @@ public class BaseTableDataManagerNeedRefreshTest {
         new MultiColumnTextIndexConfig(List.of(MC_TEXT_INDEX_COLUMN_1, 
MC_TEXT_INDEX_COLUMN_2));
     TableConfig tableConfig =
         
getTableConfigBuilder().setMultiColumnTextIndexConfig(newIndex).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
+    assertTrue(_baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
         IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
   }
 
@@ -448,7 +453,7 @@ public class BaseTableDataManagerNeedRefreshTest {
         .build();
 
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -470,7 +475,7 @@ public class BaseTableDataManagerNeedRefreshTest {
         .build();
 
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -492,7 +497,7 @@ public class BaseTableDataManagerNeedRefreshTest {
         .build();
 
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -504,7 +509,7 @@ public class BaseTableDataManagerNeedRefreshTest {
         new StarTreeIndexConfig(List.of("Carrier"), null, 
List.of(AggregationFunctionColumnPair.COUNT_STAR_NAME), null,
             100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
+    assertTrue(_baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
         IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
   }
 
@@ -529,7 +534,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -552,7 +557,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -575,7 +580,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -598,7 +603,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -621,7 +626,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertFalse(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -642,7 +647,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -662,7 +667,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -683,7 +688,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertFalse(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -705,7 +710,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -727,7 +732,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -751,7 +756,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertFalse(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -771,7 +776,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -784,7 +789,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
         createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentDataManager).isStale());
+    assertTrue(_baseTableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentDataManager).isStale());
   }
 
   @Test
@@ -805,7 +810,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig, 
newStarTreeIndexConfig)).build();
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -824,7 +829,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -840,7 +845,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
     ImmutableSegmentDataManager segmentDataManager =
         createImmutableSegmentDataManager(indexLoadingConfig, _testName, 
generateRows());
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
segmentDataManager).isStale());
+    assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
segmentDataManager).isStale());
   }
 
   @Test
@@ -859,7 +864,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig newTableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
     assertTrue(
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+        _baseTableDataManager.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
             .isStale());
   }
 
@@ -877,7 +882,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     ImmutableSegmentDataManager segmentDataManager =
         createImmutableSegmentDataManager(indexLoadingConfig, _testName, 
generateRows());
 
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
segmentDataManager).isStale());
+    assertFalse(_baseTableDataManager.isSegmentStale(indexLoadingConfig, 
segmentDataManager).isStale());
   }
 
   @Test
@@ -894,7 +899,7 @@ public class BaseTableDataManagerNeedRefreshTest {
 
     // Segment was created without the timestamp index.
     StaleSegment response =
-        
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigWithTimestampIndex, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+        
_baseTableDataManager.isSegmentStale(indexLoadingConfigWithTimestampIndex, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "timestamp index changed");
   }
@@ -917,7 +922,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     // Evaluate staleness against a fresh config with no timestamp index.
     IndexLoadingConfig indexLoadingConfigNoTimestamp = new 
IndexLoadingConfig(getTableConfigBuilder().build(),
         getSchema());
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigNoTimestamp, 
segmentDataManager);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfigNoTimestamp, 
segmentDataManager);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "timestamp index changed");
   }
@@ -942,7 +947,7 @@ public class BaseTableDataManagerNeedRefreshTest {
         
getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDayAndWeek)).build();
     IndexLoadingConfig indexLoadingConfigDayAndWeek = new 
IndexLoadingConfig(tableConfigDayAndWeek, getSchema());
 
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDayAndWeek, 
segmentDataManager);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfigDayAndWeek, 
segmentDataManager);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "timestamp index changed");
   }
@@ -968,7 +973,7 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig tableConfigDay = 
getTableConfigBuilder().setFieldConfigList(List.of(fieldConfigDay)).build();
     IndexLoadingConfig indexLoadingConfigDay = new 
IndexLoadingConfig(tableConfigDay, getSchema());
 
-    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfigDay, 
segmentDataManager);
+    StaleSegment response = 
_baseTableDataManager.isSegmentStale(indexLoadingConfigDay, segmentDataManager);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "timestamp index changed");
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index a21c210616c..88d7565c306 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -155,10 +155,14 @@ public class BaseTableDataManagerTest {
     SegmentMetadata localMetadata = mock(SegmentMetadata.class);
     when(localMetadata.getCrc()).thenReturn("0");
 
-    BaseTableDataManager tableDataManager = createTableManager();
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
     File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
     assertFalse(dataDir.exists());
-    tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(), 
zkMetadata, localMetadata, false);
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     assertTrue(dataDir.exists());
     assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
   }
@@ -175,18 +179,24 @@ public class BaseTableDataManagerTest {
     when(localMetadata.getCrc()).thenReturn("0");
 
     // No dataDir for coolTier, thus stay on default tier.
-    BaseTableDataManager tableDataManager = createTableManager();
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
+        .when(tableDataManager).fetchIndexLoadingConfig();
     File defaultDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
     assertFalse(defaultDataDir.exists());
-    tableDataManager.reloadSegment(SEGMENT_NAME, 
createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG), zkMetadata,
-        localMetadata, false);
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     assertTrue(defaultDataDir.exists());
     assertEquals(new SegmentMetadataImpl(defaultDataDir).getTotalDocs(), 5);
 
     // Configured dataDir for coolTier, thus move to new dir.
-    tableDataManager = createTableManager();
-    tableDataManager.reloadSegment(SEGMENT_NAME, 
createTierIndexLoadingConfig(TIER_TABLE_CONFIG), zkMetadata,
-        localMetadata, false);
+    tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
+        .when(tableDataManager).fetchIndexLoadingConfig();
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     File tierDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME, 
TIER_NAME, TIER_TABLE_CONFIG);
     assertTrue(tierDataDir.exists());
     assertFalse(defaultDataDir.exists());
@@ -205,15 +215,18 @@ public class BaseTableDataManagerTest {
     SegmentMetadata localMetadata = mock(SegmentMetadata.class);
     when(localMetadata.getCrc()).thenReturn(Long.toString(crc));
 
-    BaseTableDataManager tableDataManager = createTableManager();
-    tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(), 
zkMetadata, localMetadata, false);
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
     assertTrue(indexDir.exists());
     assertEquals(new SegmentMetadataImpl(indexDir).getTotalDocs(), 5);
 
     FileUtils.deleteQuietly(indexDir);
     try {
-      tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(), 
zkMetadata, localMetadata, false);
+      tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
       fail();
     } catch (Exception e) {
       // As expected, segment reloading fails due to missing the local segment 
dir.
@@ -234,17 +247,23 @@ public class BaseTableDataManagerTest {
     when(localMetadata.getCrc()).thenReturn(Long.toString(crc));
 
     // No dataDir for coolTier, thus stay on default tier.
-    BaseTableDataManager tableDataManager = createTableManager();
-    tableDataManager.reloadSegment(SEGMENT_NAME, 
createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG), zkMetadata,
-        localMetadata, false);
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
+        .when(tableDataManager).fetchIndexLoadingConfig();
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
     assertTrue(indexDir.exists());
     assertEquals(new SegmentMetadataImpl(indexDir).getTotalDocs(), 5);
 
     // Configured dataDir for coolTier, thus move to new dir.
-    tableDataManager = createTableManager();
-    tableDataManager.reloadSegment(SEGMENT_NAME, 
createTierIndexLoadingConfig(TIER_TABLE_CONFIG), zkMetadata,
-        localMetadata, false);
+    tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
+        .when(tableDataManager).fetchIndexLoadingConfig();
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     File tierDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME, 
TIER_NAME, TIER_TABLE_CONFIG);
     assertTrue(tierDataDir.exists());
     assertFalse(indexDir.exists());
@@ -267,8 +286,11 @@ public class BaseTableDataManagerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
     indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
 
-    BaseTableDataManager tableDataManager = createTableManager();
-    tableDataManager.reloadSegment(SEGMENT_NAME, indexLoadingConfig, 
zkMetadata, localMetadata, false);
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> 
indexLoadingConfig).when(tableDataManager).fetchIndexLoadingConfig();
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
     assertTrue(indexDir.exists());
     SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
@@ -295,8 +317,11 @@ public class BaseTableDataManagerTest {
         .setInvertedIndexColumns(List.of(STRING_COLUMN, LONG_COLUMN)).build();
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
 
-    BaseTableDataManager tableDataManager = createTableManager();
-    tableDataManager.reloadSegment(SEGMENT_NAME, indexLoadingConfig, 
zkMetadata, localMetadata, false);
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, localMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> 
indexLoadingConfig).when(tableDataManager).fetchIndexLoadingConfig();
+    tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
     assertEquals(tableDataManager.getSegmentDataDir(SEGMENT_NAME), indexDir);
     assertTrue(indexDir.exists());
     assertEquals(new SegmentMetadataImpl(indexDir).getTotalDocs(), 5);
@@ -311,22 +336,25 @@ public class BaseTableDataManagerTest {
     SegmentZKMetadata zkMetadata =
         makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION),
             false);
-
-    // Same CRC but force to download.
-    BaseTableDataManager tableDataManager = createTableManager();
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
     assertEquals(Long.parseLong(segmentMetadata.getCrc()), 
zkMetadata.getCrc());
 
+    // Same CRC but force to download.
+    BaseTableDataManager tableDataManager = spy(createTableManager());
+    tableDataManager.registerSegment(SEGMENT_NAME, 
createImmutableSegmentDataManager(SEGMENT_NAME, segmentMetadata));
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
+    doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
     // Remove the local segment dir. Segment reloading fails unless force to 
download.
     FileUtils.deleteQuietly(indexDir);
     try {
-      tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(), 
zkMetadata, segmentMetadata, false);
+      tableDataManager.reloadSegment(SEGMENT_NAME, false, null);
       fail();
     } catch (Exception e) {
       // As expected, segment reloading fails due to missing the local segment 
dir.
     }
 
-    tableDataManager.reloadSegment(SEGMENT_NAME, new IndexLoadingConfig(), 
zkMetadata, segmentMetadata, true);
+    tableDataManager.reloadSegment(SEGMENT_NAME, true, null);
     assertTrue(indexDir.exists());
     segmentMetadata = new SegmentMetadataImpl(indexDir);
     assertEquals(Long.parseLong(segmentMetadata.getCrc()), 
zkMetadata.getCrc());
@@ -415,7 +443,7 @@ public class BaseTableDataManagerTest {
     tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
segmentDataManager);
 
     // Mock the methods that will be called during segment replacement
-    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
     doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
 
     // Use CountDownLatch to wait for async execution
@@ -458,7 +486,7 @@ public class BaseTableDataManagerTest {
     tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
segmentDataManager);
 
     // Mock the methods that will be called during segment replacement
-    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
     doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
         .when(tableDataManager).fetchIndexLoadingConfig();
 
@@ -487,7 +515,7 @@ public class BaseTableDataManagerTest {
     tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
segmentDataManager);
 
     // Mock the methods for the second part of the test
-    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    seedZKMetadata(tableDataManager, SEGMENT_NAME, zkMetadata);
     doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
         .when(tableDataManager).fetchIndexLoadingConfig();
 
@@ -531,7 +559,7 @@ public class BaseTableDataManagerTest {
     tableDataManager._segmentDataManagerMap.put(segmentName, 
segmentDataManager);
 
     // Mock the methods that will be called during segment replacement
-    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(segmentName);
+    seedZKMetadata(tableDataManager, segmentName, zkMetadata);
     doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
 
     // Use CountDownLatch to wait for async execution
@@ -890,32 +918,56 @@ public class BaseTableDataManagerTest {
     }
   }
 
-  static OfflineTableDataManager createTableManager() {
+  protected BaseTableDataManager createTableManager() {
     return createTableManager(createDefaultInstanceDataManagerConfig());
   }
 
-  static OfflineTableDataManager 
createTableManagerWithAsyncSegmentRefreshEnabled() {
+  protected BaseTableDataManager 
createTableManagerWithAsyncSegmentRefreshEnabled() {
     return 
createTableManagerWithAsyncSegmentRefreshEnabled(createDefaultInstanceDataManagerConfig());
   }
 
-  private static OfflineTableDataManager 
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
-    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+  protected BaseTableDataManager createTableManager(InstanceDataManagerConfig 
instanceDataManagerConfig) {
+    BaseTableDataManager tableDataManager = newTableDataManager();
+    tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
         SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
         SEGMENT_OPERATIONS_THROTTLER, false, 
mock(ServerReloadJobStatusCache.class));
     return tableDataManager;
   }
 
-  private static OfflineTableDataManager 
createTableManagerWithAsyncSegmentRefreshEnabled(
+  protected BaseTableDataManager 
createTableManagerWithAsyncSegmentRefreshEnabled(
       InstanceDataManagerConfig instanceDataManagerConfig) {
-    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+    BaseTableDataManager tableDataManager = newTableDataManager();
+    tableDataManager.init(instanceDataManagerConfig, createHelixManagerMock(), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
         SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
         SEGMENT_OPERATIONS_THROTTLER, true, 
mock(ServerReloadJobStatusCache.class));
     return tableDataManager;
   }
 
-  private static InstanceDataManagerConfig 
createDefaultInstanceDataManagerConfig() {
+  /**
+   * Returns the concrete {@link BaseTableDataManager} instance under test. 
Default returns a stock
+   * {@link OfflineTableDataManager}; subclasses override to test a different 
implementation while inheriting
+   * all test bodies.
+   */
+  protected BaseTableDataManager newTableDataManager() {
+    return new OfflineTableDataManager();
+  }
+
+  /**
+   * Returns the {@link HelixManager} mock wired into the TDM under test. 
Default returns a bare Mockito mock
+   * (no property store stubbed) — fine for the inherited test bodies which 
never read ZK directly. Subclasses
+   * that exercise paths reading {@code _propertyStore} (e.g. {@code 
fetchZKMetadata}, {@code fetchIndexLoadingConfig})
+   * override to stub {@code helixManager.getHelixPropertyStore()} with a 
{@code FakePropertyStore} pre-seeded
+   * with table config + schema + per-segment ZK metadata.
+   */
+  protected HelixManager createHelixManagerMock() {
+    return mock(HelixManager.class);
+  }
+
+  protected void seedZKMetadata(BaseTableDataManager spy, String segmentName, 
SegmentZKMetadata zkMetadata) {
+    doAnswer(invocation -> zkMetadata).when(spy).fetchZKMetadata(segmentName);
+  }
+
+  protected static InstanceDataManagerConfig 
createDefaultInstanceDataManagerConfig() {
     InstanceDataManagerConfig config = mock(InstanceDataManagerConfig.class);
     when(config.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     // Check CRC matching on segment load time.
@@ -923,7 +975,7 @@ public class BaseTableDataManagerTest {
     return config;
   }
 
-  private static File createSegment(SegmentVersion segmentVersion, int numRows)
+  protected static File createSegment(SegmentVersion segmentVersion, int 
numRows)
       throws Exception {
     SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(DEFAULT_TABLE_CONFIG, SCHEMA);
     config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
@@ -942,14 +994,14 @@ public class BaseTableDataManagerTest {
     return new File(TABLE_DATA_DIR, SEGMENT_NAME);
   }
 
-  private static SegmentZKMetadata createRawSegment(SegmentVersion 
segmentVersion, int numRows)
+  protected static SegmentZKMetadata createRawSegment(SegmentVersion 
segmentVersion, int numRows)
       throws Exception {
     File indexDir = createSegment(segmentVersion, numRows);
     return makeRawSegment(indexDir,
         new File(TEMP_DIR, SEGMENT_NAME + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
   }
 
-  private static SegmentZKMetadata makeRawSegment(File indexDir, File 
rawSegmentFile, boolean deleteIndexDir)
+  protected static SegmentZKMetadata makeRawSegment(File indexDir, File 
rawSegmentFile, boolean deleteIndexDir)
       throws Exception {
     long crc = getCRC(indexDir);
     SegmentZKMetadata zkMetadata = new SegmentZKMetadata(SEGMENT_NAME);
@@ -962,7 +1014,7 @@ public class BaseTableDataManagerTest {
     return zkMetadata;
   }
 
-  private static long getCRC(File indexDir)
+  protected static long getCRC(File indexDir)
       throws IOException {
     File creationMetaFile = 
SegmentDirectoryPaths.findCreationMetaFile(indexDir);
     assertNotNull(creationMetaFile);
@@ -971,7 +1023,7 @@ public class BaseTableDataManagerTest {
     }
   }
 
-  private IndexLoadingConfig createTierIndexLoadingConfig(TableConfig 
tableConfig) {
+  protected IndexLoadingConfig createTierIndexLoadingConfig(TableConfig 
tableConfig) {
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getSegmentDirectoryLoader()).thenReturn(TIER_SEGMENT_DIRECTORY_LOADER);
     when(instanceDataManagerConfig.getConfig()).thenReturn(new 
PinotConfiguration());
@@ -981,14 +1033,19 @@ public class BaseTableDataManagerTest {
     return indexLoadingConfig;
   }
 
-  private ImmutableSegmentDataManager createImmutableSegmentDataManager(String 
segmentName, long crc) {
+  protected ImmutableSegmentDataManager 
createImmutableSegmentDataManager(String segmentName, long crc) {
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getCrc()).thenReturn(Long.toString(crc));
+    return createImmutableSegmentDataManager(segmentName, segmentMetadata);
+  }
+
+  protected ImmutableSegmentDataManager 
createImmutableSegmentDataManager(String segmentName,
+      SegmentMetadata segmentMetadata) {
     ImmutableSegmentDataManager segmentDataManager = 
mock(ImmutableSegmentDataManager.class);
     when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
     ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
     when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
-    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
     when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
-    when(segmentMetadata.getCrc()).thenReturn(Long.toString(crc));
     return segmentDataManager;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
index 2edd3c134a2..e8f5796d73c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
@@ -99,8 +99,6 @@ public abstract class SegmentDataManager {
    * The data manager can only be destroyed once.
    */
   public void destroy() {
-    // NOTE: We want the test to catch the case when destroy is called without 
offloading, but not fail the production.
-    assert _offloaded.get() : "Cannot destroy segment data manager without 
offloading it first";
     offload();
 
     if (_destroyed.compareAndSet(false, true)) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 15a8a7565e6..cc12bfcec7a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -194,6 +194,13 @@ public interface TableDataManager {
   void offloadSegmentUnsafe(String segmentName)
       throws Exception;
 
+  /**
+   * Deletes a segment from this table — offloads it if currently loaded, then 
removes its on-disk data (the per-segment
+   * directory and any tier-specific artefacts).
+   */
+  void deleteSegment(String segmentName)
+      throws Exception;
+
   /**
    * Reloads an existing immutable segment for the table, which can be an 
OFFLINE or REALTIME table.
    * A new segment may be downloaded if the local one has a different CRC; or 
can be forced to download if
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index ebe224f73d9..4c409d211b4 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -48,6 +48,7 @@ import 
org.apache.pinot.common.config.provider.LogicalTableMetadataCache;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.BaseTableDataManager;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.LogicalTableContext;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
@@ -64,9 +65,6 @@ import 
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -389,38 +387,23 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   public void deleteSegment(String tableNameWithType, String segmentName)
       throws Exception {
     LOGGER.info("Deleting segment: {} from table: {}", segmentName, 
tableNameWithType);
-    // Segment deletion is handled at instance level because table data 
manager might not exist. Acquire the lock here.
+    // Hold the per-segment lock around the TDM lookup so the lookup + delete 
is atomic vs. removeTableDataManager
+    // shutting the TDM down concurrently. The TDM's own deleteSegment 
re-acquires the same lock (ReentrantLock).
     Lock segmentLock = _segmentLocks.getLock(tableNameWithType, segmentName);
     segmentLock.lock();
     try {
-      // Check if the segment is still loaded, if so, offload it first.
-      // This might happen when the server disconnected from ZK and 
reconnected, and the segment is still loaded.
-      // TODO: Consider using table data manager to delete the segment. This 
will allow the table data manager to clean
-      //       up the segment data on all tiers. Note that table data manager 
might have not been created, and table
-      //       config might have been deleted at this point.
       TableDataManager tableDataManager = 
_tableDataManagerMap.get(tableNameWithType);
-      if (tableDataManager != null && 
tableDataManager.hasSegment(segmentName)) {
-        LOGGER.warn("Segment: {} from table: {} is still loaded, offloading it 
first", segmentName, tableNameWithType);
-        tableDataManager.offloadSegment(segmentName);
-      }
-      // Clean up the segment data on default tier unconditionally.
-      File segmentDir = getSegmentDataDirectory(tableNameWithType, 
segmentName);
-      if (segmentDir.exists()) {
-        FileUtils.deleteQuietly(segmentDir);
-        LOGGER.info("Deleted segment directory {} on default tier", 
segmentDir);
-      }
-      // We might clean up further more with the specific segment loader. But 
note that table data manager might have
-      // not been created, and table config might have been deleted at this 
point.
-      SegmentDirectoryLoader segmentLoader = 
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(
-          _instanceDataManagerConfig.getSegmentDirectoryLoader());
-      if (segmentLoader != null) {
-        LOGGER.info("Deleting segment: {} further with segment loader: {}", 
segmentName,
-            _instanceDataManagerConfig.getSegmentDirectoryLoader());
-        SegmentDirectoryLoaderContext ctx = new 
SegmentDirectoryLoaderContext.Builder().setSegmentName(segmentName)
-            .setTableDataDir(_instanceDataManagerConfig.getInstanceDataDir() + 
"/" + tableNameWithType).build();
-        segmentLoader.delete(ctx);
+      if (tableDataManager != null) {
+        // The TDM owns the offload-if-loaded prelude, the on-disk dir delete, 
and the tier-aware
+        // segment-directory-loader cleanup.
+        tableDataManager.deleteSegment(segmentName);
+      } else {
+        // Fallback: TDM can be null if it was never instantiated, or has 
already been removed via deleteTable.
+        // In that case, do a path-only cleanup keyed by segment name.
+        String tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() 
+ "/" + tableNameWithType;
+        BaseTableDataManager.deleteSegmentFilesFromDisk(tableDataDir, 
segmentName, _instanceDataManagerConfig);
+        LOGGER.info("Deleted segment: {} from table: {}", segmentName, 
tableNameWithType);
       }
-      LOGGER.info("Deleted segment: {} from table: {}", segmentName, 
tableNameWithType);
     } finally {
       segmentLock.unlock();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to