klsince commented on code in PR #12886:
URL: https://github.com/apache/pinot/pull/12886#discussion_r1569585817


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -268,64 +282,192 @@ public void addSegment(ImmutableSegment 
immutableSegment) {
     String segmentName = immutableSegment.getSegmentName();
     Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
         segmentName, _tableNameWithType);
-    _logger.info("Adding immutable segment: {} to table: {}", segmentName, 
_tableNameWithType);
+    _logger.info("Adding immutable segment: {}", segmentName);
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
         immutableSegment.getSegmentMetadata().getTotalDocs());
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
 
     ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
     SegmentDataManager oldSegmentManager = registerSegment(segmentName, 
newSegmentManager);
     if (oldSegmentManager == null) {
-      _logger.info("Added new immutable segment: {} to table: {}", 
segmentName, _tableNameWithType);
+      _logger.info("Added new immutable segment: {}", segmentName);
     } else {
-      _logger.info("Replaced immutable segment: {} of table: {}", segmentName, 
_tableNameWithType);
+      _logger.info("Replaced immutable segment: {}", segmentName);
       releaseSegment(oldSegmentManager);
     }
   }
 
   @Override
-  public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
+  public void addOnlineSegment(String segmentName)
       throws Exception {
-    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
-        indexDir.getName(), _tableNameWithType);
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot add ONLINE segment: 
%s to table: %s", segmentName,
+        _tableNameWithType);
+    _logger.info("Adding ONLINE segment: {} to table: {}", segmentName, 
_tableNameWithType);

Review Comment:
   as the other places, no need to log `_tableNameWithType`?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String 
segmentName) {
     }
   }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    return true;
-  }
-
-  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
-      throws Exception {
-    // TODO: may support download from peer servers for RealTime table.
-    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
-  }
-
-  private File downloadSegmentFromDeepStore(String segmentName, 
SegmentZKMetadata zkMetadata)
+  /**
+   * Downloads an immutable segment into the index directory.
+   * Segment can be downloaded from deep store or from peer servers. 
Downloaded segment might be compressed or
+   * encrypted, and this method takes care of decompressing and decrypting the 
segment.
+   */
+  protected File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
-    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
-      try {
-        File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, 
zkMetadata, tempRootDir,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        return moveSegment(segmentName, untaredSegDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    } else {
-      try {
-        File tarFile = downloadAndDecrypt(segmentName, zkMetadata, 
tempRootDir);
-        return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    }
-  }
-
-  private File moveSegment(String segmentName, File untaredSegDir)
-      throws IOException {
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    Preconditions.checkState(downloadUrl != null,
+        "Failed to find download URL in ZK metadata for segment: %s of table: 
%s", segmentName, _tableNameWithType);
     try {
-      File indexDir = getSegmentDataDir(segmentName);
-      FileUtils.deleteDirectory(indexDir);
-      FileUtils.moveDirectory(untaredSegDir, indexDir);
-      return indexDir;
+      if 
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+        try {
+          return downloadSegmentFromDeepStore(zkMetadata);
+        } catch (Exception e) {
+          if (_peerDownloadScheme != null) {

Review Comment:
   add a log that this is to retry the failure of downloading from deep store
   
   nit: to save some indents
   ```
   if (is peer downloand) {
     return downloadSegmentFromPeers(zkMetadata); 
   }
   ...
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment 
immutableSegment) {
       _logger.info("Preloaded immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
       return;
     }
-    // Replacing segment takes multiple steps, and particularly need to access 
the oldSegment. Replace segment may
-    // happen in two threads, i.e. the consuming thread that's committing the 
mutable segment and a HelixTaskExecutor
-    // thread that's bringing segment from ONLINE to CONSUMING when the server 
finds consuming thread can't commit
-    // the segment in time. The slower thread takes the reference of the 
oldSegment here, but it may get closed by
-    // the faster thread if not synchronized. In particular, the slower thread 
may iterate the primary keys in the
-    // oldSegment, causing seg fault. So we have to take a lock here.
-    // However, we can't just reuse the existing segmentLocks. Because many 
methods of partitionUpsertMetadataManager
-    // takes this lock internally, but after taking snapshot RW lock. If we 
take segmentLock here (before taking
-    // snapshot RW lock), we can get into deadlock with threads calling 
partitionUpsertMetadataManager's other
-    // methods, like removeSegment.
-    // Adding segment should be done by a single HelixTaskExecutor thread, but 
do it with lock here for simplicity
-    // otherwise, we'd need to double-check if oldSegmentManager is null.
-    Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
-      if (oldSegmentManager == null) {
-        // When adding a new segment, we should register it 'before' it is 
fully initialized by
-        // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
-        // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
-        // access the new segment asap even though its validDocId bitmap is 
still being filled by
-        // partitionUpsertMetadataManager.
-        registerSegment(segmentName, newSegmentManager);
-        partitionUpsertMetadataManager.addSegment(immutableSegment);
-        _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
-      } else {
-        // When replacing a segment, we should register the new segment 
'after' it is fully initialized by
-        // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
-        // to the valid docs in the old segment immediately, but the 
validDocId bitmap of the new segment is still
-        // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
-        // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
-        // consistency, otherwise the new segment should be named differently 
to go through the addSegment flow above.
-        IndexSegment oldSegment = oldSegmentManager.getSegment();
-        partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-        registerSegment(segmentName, newSegmentManager);
-        _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
-            oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
-        releaseSegment(oldSegmentManager);
-      }
-    } finally {
-      segmentLock.unlock();
-    }
-  }
-
-  @Override
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    // Cannot download consuming segment
-    if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
-      return false;
-    }
-    // TODO: may support download from peer servers as well.
-    return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
-  }
-
-  void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata 
segmentZKMetadata,
-      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
-    String uri = segmentZKMetadata.getDownloadUrl();
-    if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
-      try {
-        // TODO: cleanup and consolidate the segment loading logic a bit for 
OFFLINE and REALTIME tables.
-        //       https://github.com/apache/pinot/issues/9752
-        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
-      } catch (Exception e) {
-        _logger.warn("Download segment {} from deepstore uri {} failed.", 
segmentName, uri, e);
-        // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
-        if (_peerDownloadScheme != null) {
-          downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-        } else {
-          throw e;
-        }
-      }
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager == null) {
+      // When adding a new segment, we should register it 'before' it is fully 
initialized by
+      // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
+      // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
+      // access the new segment asap even though its validDocId bitmap is 
still being filled by
+      // partitionUpsertMetadataManager.
+      registerSegment(segmentName, newSegmentManager);
+      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
     } else {
-      if (_peerDownloadScheme != null) {
-        downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-      } else {
-        throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
-      }
-    }
-  }
-
-  private void downloadSegmentFromDeepStore(String segmentName, 
IndexLoadingConfig indexLoadingConfig, String uri) {
-    // This could leave temporary directories in _indexDir if JVM shuts down 
before the temp directory is deleted.
-    // This is fine since the temporary directories are deleted when the table 
data manager calls init.
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
-      _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Failed to download segment {} from deep store: ", 
segmentName, e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
+      // When replacing a segment, we should register the new segment 'after' 
it is fully initialized by
+      // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
+      // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
+      // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
+      // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
+      // consistency, otherwise the new segment should be named differently to 
go through the addSegment flow above.
+      IndexSegment oldSegment = oldSegmentManager.getSegment();
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager);
+      _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
+      releaseSegment(oldSegmentManager);
     }
   }
 
   /**
-   * Untars the new segment and replaces the existing segment.
+   * Replaces the CONSUMING segment with a downloaded sealed one.
    */
-  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, File segmentTarFile,
-      File tempRootDir)
-      throws IOException {
-    File untarDir = new File(tempRootDir, segmentName);
-    File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, 
untarDir).get(0);
-    _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, 
untarDir);
-    File indexDir = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(indexDir);
-    FileUtils.moveDirectory(untaredSegDir, indexDir);
-    _logger.info("Replacing LLC Segment {}", segmentName);
-    replaceLLSegment(segmentName, indexLoadingConfig);
-  }
-
-  private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      // Next download the segment from a randomly chosen server using 
configured download scheme (http or https).
-      
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
 () -> {
-        List<URI> peerServerURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName,
-                _peerDownloadScheme);
-        Collections.shuffle(peerServerURIs);
-        return peerServerURIs;
-      }, segmentTarFile);
-      _logger.info("Fetched segment {} successfully to {} of size {}", 
segmentName, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Download and move segment {} from peer with scheme {} 
failed.", segmentName, _peerDownloadScheme,
-          e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
-    }
+  public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
+    _logger.info("Downloading and replacing CONSUMING segment: {} with sealed 
one", segmentName);
+    File indexDir = downloadSegment(zkMetadata);
+    // Get a new index loading config with latest table config and schema to 
load the segment
+    IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null);

Review Comment:
   why not pass zkMetadata into this method?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -428,17 +570,25 @@ public ExecutorService getSegmentPreloadExecutor() {
 
   @Override
   public void addSegmentError(String segmentName, SegmentErrorInfo 
segmentErrorInfo) {
-    _errorCache.put(Pair.of(_tableNameWithType, segmentName), 
segmentErrorInfo);
+    if (_errorCache != null) {
+      _errorCache.put(Pair.of(_tableNameWithType, segmentName), 
segmentErrorInfo);
+    }
   }
 
   @Override
   public Map<String, SegmentErrorInfo> getSegmentErrors() {
-    if (_errorCache == null) {
-      return Collections.emptyMap();
+    if (_errorCache != null) {
+      // Filter out entries that match the table name
+      Map<String, SegmentErrorInfo> segmentErrors = new HashMap<>();
+      for (Map.Entry<Pair<String, String>, SegmentErrorInfo> entry : 
_errorCache.asMap().entrySet()) {
+        Pair<String, String> tableSegmentPair = entry.getKey();
+        if (tableSegmentPair.getLeft().equals(_tableNameWithType)) {
+          segmentErrors.put(tableSegmentPair.getRight(), entry.getValue());
+        }
+      }
+      return segmentErrors;
     } else {
-      // Filter out entries that match the table name.
-      return _errorCache.asMap().entrySet().stream().filter(map -> 
map.getKey().getLeft().equals(_tableNameWithType))
-          .collect(Collectors.toMap(map -> map.getKey().getRight(), 
Map.Entry::getValue));
+      return Map.of();

Review Comment:
   nit: why not continue to use Collections.emptyMap()? I didn't see use of 
Map.of() else where in the repo, iirc it used to cause compile issue in older 
JDK, 8?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -268,64 +282,192 @@ public void addSegment(ImmutableSegment 
immutableSegment) {
     String segmentName = immutableSegment.getSegmentName();
     Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
         segmentName, _tableNameWithType);
-    _logger.info("Adding immutable segment: {} to table: {}", segmentName, 
_tableNameWithType);
+    _logger.info("Adding immutable segment: {}", segmentName);
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
         immutableSegment.getSegmentMetadata().getTotalDocs());
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
 
     ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
     SegmentDataManager oldSegmentManager = registerSegment(segmentName, 
newSegmentManager);
     if (oldSegmentManager == null) {
-      _logger.info("Added new immutable segment: {} to table: {}", 
segmentName, _tableNameWithType);
+      _logger.info("Added new immutable segment: {}", segmentName);
     } else {
-      _logger.info("Replaced immutable segment: {} of table: {}", segmentName, 
_tableNameWithType);
+      _logger.info("Replaced immutable segment: {}", segmentName);
       releaseSegment(oldSegmentManager);
     }
   }
 
   @Override
-  public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
+  public void addOnlineSegment(String segmentName)
       throws Exception {
-    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
-        indexDir.getName(), _tableNameWithType);
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot add ONLINE segment: 
%s to table: %s", segmentName,
+        _tableNameWithType);
+    _logger.info("Adding ONLINE segment: {} to table: {}", segmentName, 
_tableNameWithType);
+    Lock segmentLock = getSegmentLock(segmentName);
+    segmentLock.lock();
+    try {
+      doAddOnlineSegment(segmentName);
+    } catch (Exception e) {
+      addSegmentError(segmentName,
+          new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception 
while adding ONLINE segment", e));
+      throw e;
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  protected abstract void doAddOnlineSegment(String segmentName)
+      throws Exception;
+
+  protected SegmentZKMetadata getZKMetadata(String segmentName) {
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
_tableNameWithType, segmentName);
+    Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s of table: %s", segmentName,
+        _tableNameWithType);
+    return zkMetadata;
+  }
+
+  protected TableConfig getTableConfig() {
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
+    return tableConfig;
+  }
+
+  @Nullable
+  protected Schema getSchema(TableConfig tableConfig) {
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
+    // NOTE: Schema is mandatory for REALTIME table.
+    if (tableConfig.getTableType() == TableType.REALTIME) {
+      Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
+    }
+    return schema;
+  }
+
+  protected IndexLoadingConfig getIndexLoadingConfig(@Nullable 
SegmentZKMetadata zkMetadata) {
+    TableConfig tableConfig = getTableConfig();
+    Schema schema = getSchema(tableConfig);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
     
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
indexLoadingConfig.getSchema()));
+    if (zkMetadata != null) {
+      indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
+    }
+    return indexLoadingConfig;
   }
 
-  @Override
-  public void addSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata)
+  /**
+   * Adds a new ONLINE segment that is not already loaded.
+   */
+  protected void addNewOnlineSegment(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig)
+      throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
+    _logger.info("Adding new ONLINE segment: {}", segmentName);
+    if (!tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata)) {
+      downloadAndLoadSegment(zkMetadata, indexLoadingConfig);
+    }
+  }
+
+  /**
+   * Replaces an already loaded segment in a table if the segment has been 
overridden in the deep store (CRC mismatch).
+   */
+  protected void replaceSegmentIfCrcMismatch(SegmentDataManager 
segmentDataManager, SegmentZKMetadata zkMetadata,
+      IndexLoadingConfig indexLoadingConfig)
       throws Exception {
-    throw new UnsupportedOperationException();
+    String segmentName = segmentDataManager.getSegmentName();
+    Preconditions.checkState(segmentDataManager instanceof 
ImmutableSegmentDataManager,
+        "Cannot replace CONSUMING segment: %s in table: %s", segmentName, 
_tableNameWithType);
+    SegmentMetadata localMetadata = 
segmentDataManager.getSegment().getSegmentMetadata();
+    if (hasSameCRC(zkMetadata, localMetadata)) {
+      _logger.info("Segment: {} has CRC: {} same as before, not replacing it", 
segmentName, localMetadata.getCrc());
+      return;
+    }
+    _logger.info("Replacing segment: {} because its CRC has changed from: {} 
to: {}", segmentName,
+        localMetadata.getCrc(), zkMetadata.getCrc());
+    downloadAndLoadSegment(zkMetadata, indexLoadingConfig);
+    _logger.info("Replaced segment: {} with new CRC: {}", segmentName, 
zkMetadata.getCrc());
   }
 
   /**
-   * Called when we get a helix transition to go to offline or dropped state.
-   * We need to remove it safely, keeping in mind that there may be queries 
that are
-   * using the segment,
-   * @param segmentName name of the segment to remove.
+   * Downloads a segment and loads it into the table.
    */
+  protected void downloadAndLoadSegment(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig)
+      throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
+    _logger.info("Downloading and loading segment: {}", segmentName);
+    File indexDir = downloadSegment(zkMetadata);
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig));
+    _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", 
segmentName, zkMetadata.getCrc(),
+        TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
+  }
+
   @Override
-  public void removeSegment(String segmentName) {
-    // Allow removing segment after shutdown so that we can remove the segment 
when the table is deleted
+  public void replaceSegment(String segmentName)
+      throws Exception {
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot replace segment: %s 
in table: %s", segmentName,
+        _tableNameWithType);
+    _logger.info("Replacing segment: {}", segmentName);
+    Lock segmentLock = getSegmentLock(segmentName);
+    segmentLock.lock();
+    try {
+      doReplaceSegment(segmentName);
+    } catch (Exception e) {
+      addSegmentError(segmentName,
+          new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception 
while replacing segment", e));
+      throw e;
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  protected void doReplaceSegment(String segmentName)
+      throws Exception {
+    SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
+    if (segmentDataManager != null) {
+      SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+      IndexLoadingConfig indexLoadingConfig = 
getIndexLoadingConfig(zkMetadata);
+      replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, 
indexLoadingConfig);
+    } else {
+      _logger.warn("Failed to find segment: {}, skipping replacing it", 
segmentName);
+    }
+  }
+
+  @Override
+  public void offloadSegment(String segmentName) {
+    // NOTE: Do not throw exception when data manager has been shut down. This 
is regular flow when a table is deleted.
     if (_shutDown) {
-      _logger.info("Table data manager is already shut down, skip removing 
segment: {} from table: {}", segmentName,
-          _tableNameWithType);
+      _logger.info("Table data manager is already shut down, skipping 
offloading segment: {}", segmentName);
       return;
     }
-    _logger.info("Removing segment: {} from table: {}", segmentName, 
_tableNameWithType);
+    _logger.info("Offloading segment: {}", segmentName);
+    Lock segmentLock = getSegmentLock(segmentName);
+    segmentLock.lock();
+    try {
+      doOffloadSegment(segmentName);
+    } catch (Exception e) {
+      addSegmentError(segmentName,
+          new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception 
while offloading segment", e));
+      throw e;
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  protected void doOffloadSegment(String segmentName) {
     SegmentDataManager segmentDataManager = unregisterSegment(segmentName);
     if (segmentDataManager != null) {
+      segmentDataManager.offload();
       releaseSegment(segmentDataManager);
-      _logger.info("Removed segment: {} from table: {}", segmentName, 
_tableNameWithType);
+      _logger.info("Offloaded segment: {}", segmentName);
     } else {
-      _logger.info("Failed to find segment: {} in table: {}", segmentName, 
_tableNameWithType);
+      _logger.warn("Failed to find segment: {}, skipping offloading it", 
_tableNameWithType);

Review Comment:
   s/_tableNameWithType/segmentName



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String 
segmentName) {
     }
   }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    return true;
-  }
-
-  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
-      throws Exception {
-    // TODO: may support download from peer servers for RealTime table.
-    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
-  }
-
-  private File downloadSegmentFromDeepStore(String segmentName, 
SegmentZKMetadata zkMetadata)
+  /**
+   * Downloads an immutable segment into the index directory.
+   * Segment can be downloaded from deep store or from peer servers. 
Downloaded segment might be compressed or
+   * encrypted, and this method takes care of decompressing and decrypting the 
segment.
+   */
+  protected File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
-    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
-      try {
-        File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, 
zkMetadata, tempRootDir,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        return moveSegment(segmentName, untaredSegDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    } else {
-      try {
-        File tarFile = downloadAndDecrypt(segmentName, zkMetadata, 
tempRootDir);
-        return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    }
-  }
-
-  private File moveSegment(String segmentName, File untaredSegDir)
-      throws IOException {
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    Preconditions.checkState(downloadUrl != null,
+        "Failed to find download URL in ZK metadata for segment: %s of table: 
%s", segmentName, _tableNameWithType);
     try {
-      File indexDir = getSegmentDataDir(segmentName);
-      FileUtils.deleteDirectory(indexDir);
-      FileUtils.moveDirectory(untaredSegDir, indexDir);
-      return indexDir;
+      if 
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+        try {
+          return downloadSegmentFromDeepStore(zkMetadata);
+        } catch (Exception e) {
+          if (_peerDownloadScheme != null) {
+            return downloadSegmentFromPeers(zkMetadata);
+          } else {
+            throw e;
+          }
+        }
+      } else {
+        return downloadSegmentFromPeers(zkMetadata);
+      }
     } catch (Exception e) {
-      LOGGER.error("Failed to move segment: {} of table: {}", segmentName, 
_tableNameWithType);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1);
       throw e;
     }
   }
 
   @VisibleForTesting
-  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, 
File tempRootDir)
+  File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    String uri = zkMetadata.getDownloadUrl();
-    boolean downloadSuccess = false;
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl);
+    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
+    if (_segmentDownloadSemaphore != null) {
+      long startTime = System.currentTimeMillis();
+      _logger.info("Acquiring segment download semaphore for segment: {}, 
queue-length: {} ", segmentName,
+          _segmentDownloadSemaphore.getQueueLength());
+      _segmentDownloadSemaphore.acquire();
+      _logger.info("Acquired segment download semaphore for segment: {} 
(lock-time={}ms, queue-length={}).",
+          segmentName, System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
+    }
     try {
-      if (_segmentDownloadSemaphore != null) {
-        long startTime = System.currentTimeMillis();
-        LOGGER.info("Trying to acquire segment download semaphore for: {}. 
queue-length: {} ", segmentName,
-            _segmentDownloadSemaphore.getQueueLength());
-        _segmentDownloadSemaphore.acquire();
-        LOGGER.info("Acquired segment download semaphore for: {} 
(lock-time={}ms, queue-length={}).", segmentName,
-            System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
-      }
-      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, 
zkMetadata.getCrypterName());
-      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: 
{}, file length: {}", segmentName,
-          _tableNameWithType, uri, tarFile, tarFile.length());
-      downloadSuccess = true;
-      return tarFile;
-    } catch (AttemptsExceededException e) {
-      LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from: {} to: {}", segmentName,
-          _tableNameWithType, uri, tarFile);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
-      if (_peerDownloadScheme == null) {
-        throw e;
+      File untarredSegmentDir;
+      if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
+        _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}", segmentName,
+            _streamSegmentDownloadUntarRateLimitBytesPerSec);
+        AtomicInteger attempts = new AtomicInteger(0);
+        try {
+          untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+              _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts);
+          _logger.info("Downloaded and untarred segment: {} from: {}, 
attempts: {}", segmentName, downloadUrl,
+              attempts.get());
+        } finally {
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,

Review Comment:
   not always failures? 



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String 
segmentName) {
     }
   }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    return true;
-  }
-
-  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
-      throws Exception {
-    // TODO: may support download from peer servers for RealTime table.
-    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
-  }
-
-  private File downloadSegmentFromDeepStore(String segmentName, 
SegmentZKMetadata zkMetadata)
+  /**
+   * Downloads an immutable segment into the index directory.
+   * Segment can be downloaded from deep store or from peer servers. 
Downloaded segment might be compressed or
+   * encrypted, and this method takes care of decompressing and decrypting the 
segment.
+   */
+  protected File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
-    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
-      try {
-        File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, 
zkMetadata, tempRootDir,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        return moveSegment(segmentName, untaredSegDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    } else {
-      try {
-        File tarFile = downloadAndDecrypt(segmentName, zkMetadata, 
tempRootDir);
-        return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    }
-  }
-
-  private File moveSegment(String segmentName, File untaredSegDir)
-      throws IOException {
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    Preconditions.checkState(downloadUrl != null,
+        "Failed to find download URL in ZK metadata for segment: %s of table: 
%s", segmentName, _tableNameWithType);
     try {
-      File indexDir = getSegmentDataDir(segmentName);
-      FileUtils.deleteDirectory(indexDir);
-      FileUtils.moveDirectory(untaredSegDir, indexDir);
-      return indexDir;
+      if 
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+        try {
+          return downloadSegmentFromDeepStore(zkMetadata);
+        } catch (Exception e) {
+          if (_peerDownloadScheme != null) {
+            return downloadSegmentFromPeers(zkMetadata);
+          } else {
+            throw e;
+          }
+        }
+      } else {
+        return downloadSegmentFromPeers(zkMetadata);
+      }
     } catch (Exception e) {
-      LOGGER.error("Failed to move segment: {} of table: {}", segmentName, 
_tableNameWithType);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1);
       throw e;
     }
   }
 
   @VisibleForTesting
-  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, 
File tempRootDir)
+  File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    String uri = zkMetadata.getDownloadUrl();
-    boolean downloadSuccess = false;
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl);
+    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
+    if (_segmentDownloadSemaphore != null) {
+      long startTime = System.currentTimeMillis();
+      _logger.info("Acquiring segment download semaphore for segment: {}, 
queue-length: {} ", segmentName,
+          _segmentDownloadSemaphore.getQueueLength());
+      _segmentDownloadSemaphore.acquire();
+      _logger.info("Acquired segment download semaphore for segment: {} 
(lock-time={}ms, queue-length={}).",
+          segmentName, System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
+    }
     try {
-      if (_segmentDownloadSemaphore != null) {
-        long startTime = System.currentTimeMillis();
-        LOGGER.info("Trying to acquire segment download semaphore for: {}. 
queue-length: {} ", segmentName,
-            _segmentDownloadSemaphore.getQueueLength());
-        _segmentDownloadSemaphore.acquire();
-        LOGGER.info("Acquired segment download semaphore for: {} 
(lock-time={}ms, queue-length={}).", segmentName,
-            System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
-      }
-      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, 
zkMetadata.getCrypterName());
-      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: 
{}, file length: {}", segmentName,
-          _tableNameWithType, uri, tarFile, tarFile.length());
-      downloadSuccess = true;
-      return tarFile;
-    } catch (AttemptsExceededException e) {
-      LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from: {} to: {}", segmentName,
-          _tableNameWithType, uri, tarFile);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
-      if (_peerDownloadScheme == null) {
-        throw e;
+      File untarredSegmentDir;
+      if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
+        _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}", segmentName,
+            _streamSegmentDownloadUntarRateLimitBytesPerSec);
+        AtomicInteger attempts = new AtomicInteger(0);
+        try {
+          untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+              _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts);
+          _logger.info("Downloaded and untarred segment: {} from: {}, 
attempts: {}", segmentName, downloadUrl,
+              attempts.get());
+        } finally {
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+              attempts.get());
+        }
+      } else {
+        File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, 
segmentTarFile, zkMetadata.getCrypterName());
+        _logger.info("Downloaded tarred segment: {} from: {} to: {}, file 
length: {}", segmentName, downloadUrl,
+            segmentTarFile, segmentTarFile.length());
+        untarredSegmentDir = untarSegment(segmentName, segmentTarFile, 
tempRootDir);
       }
-      downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile);
-      downloadSuccess = true;
-      return tarFile;
+      File indexDir = moveSegment(segmentName, untarredSegmentDir);
+      _logger.info("Downloaded segment: {} from: {} to: {}", segmentName, 
downloadUrl, indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1);
+      throw e;
     } finally {
-      if (!downloadSuccess) {
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
-      }
       if (_segmentDownloadSemaphore != null) {
         _segmentDownloadSemaphore.release();
       }
+      FileUtils.deleteQuietly(tempRootDir);
     }
   }
 
-  protected void downloadFromPeersWithoutStreaming(String segmentName, 
SegmentZKMetadata zkMetadata, File destTarFile)
+  @VisibleForTesting
+  File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
       throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
     Preconditions.checkState(_peerDownloadScheme != null, "Peer download is 
not enabled for table: %s",
         _tableNameWithType);
+    _logger.info("Downloading segment: {} from peers", segmentName);
+    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());

Review Comment:
   pull `File tmepRootDir...` and `FileUtils.deleteQuietly(tempRootDir);` up 
into the caller `downloadSegment()` to save those few lines from 
downloadeFromDeepstore and xxxfromPeers



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -752,14 +752,20 @@ public void run() {
                   _state = State.DISCARDED;
                   break;
                 case DEFAULT:
-                  success = buildSegmentAndReplace();
-                  if (success) {
-                    _state = State.RETAINED;
-                  } else {
-                    // Could not build segment for some reason. We can only 
download it.
-                    _state = State.ERROR;
-                    _segmentLogger.error("Could not build segment for {}", 
_segmentNameStr);
+                  // Lock the segment to avoid multiple threads touching the 
same segment.
+                  Lock segmentLock = 
_realtimeTableDataManager.getSegmentLock(_segmentNameStr);
+                  segmentLock.lock();
+                  try {
+                    if (buildSegmentAndReplace()) {

Review Comment:
   move lock/unlock into buildSegmentAndReplace() method?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment 
immutableSegment) {
       _logger.info("Preloaded immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
       return;
     }
-    // Replacing segment takes multiple steps, and particularly need to access 
the oldSegment. Replace segment may
-    // happen in two threads, i.e. the consuming thread that's committing the 
mutable segment and a HelixTaskExecutor
-    // thread that's bringing segment from ONLINE to CONSUMING when the server 
finds consuming thread can't commit
-    // the segment in time. The slower thread takes the reference of the 
oldSegment here, but it may get closed by
-    // the faster thread if not synchronized. In particular, the slower thread 
may iterate the primary keys in the
-    // oldSegment, causing seg fault. So we have to take a lock here.
-    // However, we can't just reuse the existing segmentLocks. Because many 
methods of partitionUpsertMetadataManager
-    // takes this lock internally, but after taking snapshot RW lock. If we 
take segmentLock here (before taking
-    // snapshot RW lock), we can get into deadlock with threads calling 
partitionUpsertMetadataManager's other
-    // methods, like removeSegment.
-    // Adding segment should be done by a single HelixTaskExecutor thread, but 
do it with lock here for simplicity
-    // otherwise, we'd need to double-check if oldSegmentManager is null.
-    Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
-      if (oldSegmentManager == null) {
-        // When adding a new segment, we should register it 'before' it is 
fully initialized by
-        // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
-        // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
-        // access the new segment asap even though its validDocId bitmap is 
still being filled by
-        // partitionUpsertMetadataManager.
-        registerSegment(segmentName, newSegmentManager);
-        partitionUpsertMetadataManager.addSegment(immutableSegment);
-        _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
-      } else {
-        // When replacing a segment, we should register the new segment 
'after' it is fully initialized by
-        // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
-        // to the valid docs in the old segment immediately, but the 
validDocId bitmap of the new segment is still
-        // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
-        // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
-        // consistency, otherwise the new segment should be named differently 
to go through the addSegment flow above.
-        IndexSegment oldSegment = oldSegmentManager.getSegment();
-        partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-        registerSegment(segmentName, newSegmentManager);
-        _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
-            oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
-        releaseSegment(oldSegmentManager);
-      }
-    } finally {
-      segmentLock.unlock();
-    }
-  }
-
-  @Override
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    // Cannot download consuming segment
-    if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
-      return false;
-    }
-    // TODO: may support download from peer servers as well.
-    return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
-  }
-
-  void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata 
segmentZKMetadata,
-      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
-    String uri = segmentZKMetadata.getDownloadUrl();
-    if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
-      try {
-        // TODO: cleanup and consolidate the segment loading logic a bit for 
OFFLINE and REALTIME tables.
-        //       https://github.com/apache/pinot/issues/9752
-        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
-      } catch (Exception e) {
-        _logger.warn("Download segment {} from deepstore uri {} failed.", 
segmentName, uri, e);
-        // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
-        if (_peerDownloadScheme != null) {
-          downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-        } else {
-          throw e;
-        }
-      }
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager == null) {
+      // When adding a new segment, we should register it 'before' it is fully 
initialized by
+      // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
+      // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
+      // access the new segment asap even though its validDocId bitmap is 
still being filled by
+      // partitionUpsertMetadataManager.
+      registerSegment(segmentName, newSegmentManager);
+      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
     } else {
-      if (_peerDownloadScheme != null) {
-        downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-      } else {
-        throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
-      }
-    }
-  }
-
-  private void downloadSegmentFromDeepStore(String segmentName, 
IndexLoadingConfig indexLoadingConfig, String uri) {
-    // This could leave temporary directories in _indexDir if JVM shuts down 
before the temp directory is deleted.
-    // This is fine since the temporary directories are deleted when the table 
data manager calls init.
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
-      _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Failed to download segment {} from deep store: ", 
segmentName, e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
+      // When replacing a segment, we should register the new segment 'after' 
it is fully initialized by
+      // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
+      // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
+      // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
+      // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
+      // consistency, otherwise the new segment should be named differently to 
go through the addSegment flow above.
+      IndexSegment oldSegment = oldSegmentManager.getSegment();
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager);
+      _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
+      releaseSegment(oldSegmentManager);
     }
   }
 
   /**
-   * Untars the new segment and replaces the existing segment.
+   * Replaces the CONSUMING segment with a downloaded sealed one.
    */
-  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, File segmentTarFile,
-      File tempRootDir)
-      throws IOException {
-    File untarDir = new File(tempRootDir, segmentName);
-    File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, 
untarDir).get(0);
-    _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, 
untarDir);
-    File indexDir = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(indexDir);
-    FileUtils.moveDirectory(untaredSegDir, indexDir);
-    _logger.info("Replacing LLC Segment {}", segmentName);
-    replaceLLSegment(segmentName, indexLoadingConfig);
-  }
-
-  private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      // Next download the segment from a randomly chosen server using 
configured download scheme (http or https).
-      
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
 () -> {
-        List<URI> peerServerURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName,
-                _peerDownloadScheme);
-        Collections.shuffle(peerServerURIs);
-        return peerServerURIs;
-      }, segmentTarFile);
-      _logger.info("Fetched segment {} successfully to {} of size {}", 
segmentName, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Download and move segment {} from peer with scheme {} 
failed.", segmentName, _peerDownloadScheme,
-          e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
-    }
+  public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
+    _logger.info("Downloading and replacing CONSUMING segment: {} with sealed 
one", segmentName);
+    File indexDir = downloadSegment(zkMetadata);
+    // Get a new index loading config with latest table config and schema to 
load the segment
+    IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null);
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig));
+    _logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
   }
 
   /**
-   * Replaces a committed LLC REALTIME segment.
+   * Replaces the CONSUMING segment with the sealed one.

Review Comment:
   nit:`... with the one sealed locally` to be more precise?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -377,111 +369,112 @@ private boolean isUpsertPreloadEnabled() {
         && upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload();
   }
 
-  /*
-   * This call comes in one of two ways:
-   * For HL Segments:
-   * - We are being directed by helix to own up all the segments that we 
committed and are still in retention. In
-   * this case we treat it exactly like how OfflineTableDataManager would -- 
wrap it into an
-   * OfflineSegmentDataManager, and put it in the map.
-   * - We are being asked to own up a new realtime segment. In this case, we 
wrap the segment with a
-   * RealTimeSegmentDataManager (that kicks off consumption). When the segment 
is committed we get notified via the
-   * notifySegmentCommitted call, at which time we replace the segment with 
the OfflineSegmentDataManager
-   *
-   * For LL Segments:
-   * - We are being asked to start consuming from a partition.
-   * - We did not know about the segment and are being asked to download and 
own the segment (re-balancing, or
-   *   replacing a realtime server with a fresh one, maybe). We need to look 
at segment metadata and decide whether
-   *   to start consuming or download the segment.
+  /**
+   * Handles upsert preload, and returns whether the upsert preload is enabled.
    */
-  @Override
-  public void addSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
-      throws Exception {
-    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
-        segmentName, _tableNameWithType);
-    boolean upsertPreloadEnabled = isUpsertPreloadEnabled();
-    if (upsertPreloadEnabled) {
-      Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, 
null);
-      Preconditions.checkNotNull(partitionId,
-          String.format("Failed to get partition id for segment: %s 
(upsert-enabled table: %s)", segmentName,
-              _tableNameWithType));
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-          _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
-      partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig);
-      // Continue to add segment after preloading, as the segment might not be 
added by preloading.
+  private void handleUpsertPreload(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig) {
+    if (!isUpsertPreloadEnabled()) {
+      return;
     }
+    String segmentName = zkMetadata.getSegmentName();
+    Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null);
+    Preconditions.checkState(partitionId != null,
+        String.format("Failed to get partition id for segment: %s in 
upsert-enabled table: %s", segmentName,
+            _tableNameWithType));
+    
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
+  }
+
+  protected void doAddOnlineSegment(String segmentName)
+      throws Exception {
+    SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+    Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
+        "Segment: %s of table: %s is not committed, cannot make it ONLINE", 
segmentName, _tableNameWithType);
+    IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata);
+    handleUpsertPreload(zkMetadata, indexLoadingConfig);
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
-    if (segmentDataManager != null) {
-      if (upsertPreloadEnabled) {
-        _logger.debug(
-            "Skipping adding existing segment: {} for table: {} with data 
manager class: {}, as it's preloaded",
-            segmentName, _tableNameWithType, 
segmentDataManager.getClass().getSimpleName());
+    if (segmentDataManager == null) {
+      addNewOnlineSegment(zkMetadata, indexLoadingConfig);
+    } else {
+      if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+        _logger.info("Changing segment: {} from CONSUMING to ONLINE", 
segmentName);
+        ((RealtimeSegmentDataManager) 
segmentDataManager).goOnlineFromConsuming(zkMetadata);
+        onConsumingToOnline(segmentName);
       } else {
-        _logger.warn("Skipping adding existing segment: {} for table: {} with 
data manager class: {}", segmentName,
-            _tableNameWithType, segmentDataManager.getClass().getSimpleName());
+        replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, 
indexLoadingConfig);
       }
+    }
+  }
+
+  @Override
+  public void addConsumingSegment(String segmentName) {
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot add CONSUMING 
segment: %s to table: %s", segmentName,
+        _tableNameWithType);
+    _logger.info("Adding CONSUMING segment: {}", segmentName);
+    Lock segmentLock = getSegmentLock(segmentName);
+    segmentLock.lock();
+    try {
+      doAddConsumingSegment(segmentName);
+    } catch (Exception e) {
+      addSegmentError(segmentName,
+          new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception 
while adding CONSUMING segment", e));
+      throw e;
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  private void doAddConsumingSegment(String segmentName) {
+    SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+    // NOTE: We do not throw exception here because the segment might have 
just been committed before the state
+    //       transition is processed. We can skip adding this segment, and 
rely on the following CONSUMING -> ONLINE
+    //       state transition to add it.

Review Comment:
   how about add this to the comment that: We can skip ..., and the segment 
will enter 'CONSUMING' state, and we can rely on ... 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java:
##########
@@ -76,22 +79,52 @@ void init(InstanceDataManagerConfig 
instanceDataManagerConfig, TableConfig table
 
   boolean isShutDown();
 
+  /**
+   * Returns the segment lock for a segment in the table.
+   */
+  Lock getSegmentLock(String segmentName);
+
+  /**
+   * Returns whether the segment is loaded in the table.
+   */
+  boolean hasSegment(String segmentName);
+
   /**
    * Adds a loaded immutable segment into the table.
    */
+  @VisibleForTesting
   void addSegment(ImmutableSegment immutableSegment);

Review Comment:
   how about move this into the base class as it's more of an internal helper 
method now



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -377,111 +369,112 @@ private boolean isUpsertPreloadEnabled() {
         && upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload();
   }
 
-  /*
-   * This call comes in one of two ways:
-   * For HL Segments:
-   * - We are being directed by helix to own up all the segments that we 
committed and are still in retention. In
-   * this case we treat it exactly like how OfflineTableDataManager would -- 
wrap it into an
-   * OfflineSegmentDataManager, and put it in the map.
-   * - We are being asked to own up a new realtime segment. In this case, we 
wrap the segment with a
-   * RealTimeSegmentDataManager (that kicks off consumption). When the segment 
is committed we get notified via the
-   * notifySegmentCommitted call, at which time we replace the segment with 
the OfflineSegmentDataManager
-   *
-   * For LL Segments:
-   * - We are being asked to start consuming from a partition.
-   * - We did not know about the segment and are being asked to download and 
own the segment (re-balancing, or
-   *   replacing a realtime server with a fresh one, maybe). We need to look 
at segment metadata and decide whether
-   *   to start consuming or download the segment.
+  /**
+   * Handles upsert preload, and returns whether the upsert preload is enabled.
    */
-  @Override
-  public void addSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
-      throws Exception {
-    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
-        segmentName, _tableNameWithType);
-    boolean upsertPreloadEnabled = isUpsertPreloadEnabled();
-    if (upsertPreloadEnabled) {
-      Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, 
null);
-      Preconditions.checkNotNull(partitionId,
-          String.format("Failed to get partition id for segment: %s 
(upsert-enabled table: %s)", segmentName,
-              _tableNameWithType));
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-          _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
-      partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig);
-      // Continue to add segment after preloading, as the segment might not be 
added by preloading.
+  private void handleUpsertPreload(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig) {
+    if (!isUpsertPreloadEnabled()) {
+      return;
     }
+    String segmentName = zkMetadata.getSegmentName();
+    Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null);
+    Preconditions.checkState(partitionId != null,
+        String.format("Failed to get partition id for segment: %s in 
upsert-enabled table: %s", segmentName,
+            _tableNameWithType));
+    
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
+  }
+
+  protected void doAddOnlineSegment(String segmentName)
+      throws Exception {
+    SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+    Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
+        "Segment: %s of table: %s is not committed, cannot make it ONLINE", 
segmentName, _tableNameWithType);
+    IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata);
+    handleUpsertPreload(zkMetadata, indexLoadingConfig);
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
-    if (segmentDataManager != null) {
-      if (upsertPreloadEnabled) {
-        _logger.debug(
-            "Skipping adding existing segment: {} for table: {} with data 
manager class: {}, as it's preloaded",
-            segmentName, _tableNameWithType, 
segmentDataManager.getClass().getSimpleName());
+    if (segmentDataManager == null) {
+      addNewOnlineSegment(zkMetadata, indexLoadingConfig);
+    } else {
+      if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+        _logger.info("Changing segment: {} from CONSUMING to ONLINE", 
segmentName);
+        ((RealtimeSegmentDataManager) 
segmentDataManager).goOnlineFromConsuming(zkMetadata);
+        onConsumingToOnline(segmentName);
       } else {
-        _logger.warn("Skipping adding existing segment: {} for table: {} with 
data manager class: {}", segmentName,
-            _tableNameWithType, segmentDataManager.getClass().getSimpleName());
+        replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, 
indexLoadingConfig);
       }
+    }
+  }
+
+  @Override
+  public void addConsumingSegment(String segmentName) {
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot add CONSUMING 
segment: %s to table: %s", segmentName,
+        _tableNameWithType);
+    _logger.info("Adding CONSUMING segment: {}", segmentName);
+    Lock segmentLock = getSegmentLock(segmentName);
+    segmentLock.lock();
+    try {
+      doAddConsumingSegment(segmentName);
+    } catch (Exception e) {
+      addSegmentError(segmentName,
+          new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception 
while adding CONSUMING segment", e));
+      throw e;
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  private void doAddConsumingSegment(String segmentName) {
+    SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+    // NOTE: We do not throw exception here because the segment might have 
just been committed before the state
+    //       transition is processed. We can skip adding this segment, and 
rely on the following CONSUMING -> ONLINE
+    //       state transition to add it.
+    if (zkMetadata.getStatus() != Status.IN_PROGRESS) {
+      _logger.warn("Segment: {} is already committed, skipping adding it as 
CONSUMING segment", segmentName);
+      return;
+    }
+    IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata);
+    handleUpsertPreload(zkMetadata, indexLoadingConfig);
+    SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
+    if (segmentDataManager != null) {
+      _logger.warn("Segment: {} already exists, skipping adding it as 
CONSUMING segment", segmentName);

Review Comment:
   nit: check if segmentDataManager is mutable/immutable segment and log it for 
debug



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java:
##########
@@ -67,6 +67,8 @@ public interface PartitionUpsertMetadataManager extends 
Closeable {
 
   /**
    * Preload segments for the table partition. Segments can be added 
differently during preloading.
+   * TODO: Revisit this and see if we can use the same IndexLoadingConfig for 
all segments. Tier info might be different
+   *       for different segments.

Review Comment:
   good catch, I think we could fix it like below:
   ```
   void doPreloadSegmentWithSnapshot(...) {
     // need to make a copy of indexLoadingConfig; 
setSegmentTier(segmentZKMetadata.getTier());
     // pass the copy of indexLoadingConfig to method below
     tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, 
segmentZKMetadata);
   }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment 
immutableSegment) {
       _logger.info("Preloaded immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
       return;
     }
-    // Replacing segment takes multiple steps, and particularly need to access 
the oldSegment. Replace segment may
-    // happen in two threads, i.e. the consuming thread that's committing the 
mutable segment and a HelixTaskExecutor
-    // thread that's bringing segment from ONLINE to CONSUMING when the server 
finds consuming thread can't commit
-    // the segment in time. The slower thread takes the reference of the 
oldSegment here, but it may get closed by
-    // the faster thread if not synchronized. In particular, the slower thread 
may iterate the primary keys in the
-    // oldSegment, causing seg fault. So we have to take a lock here.
-    // However, we can't just reuse the existing segmentLocks. Because many 
methods of partitionUpsertMetadataManager
-    // takes this lock internally, but after taking snapshot RW lock. If we 
take segmentLock here (before taking
-    // snapshot RW lock), we can get into deadlock with threads calling 
partitionUpsertMetadataManager's other
-    // methods, like removeSegment.
-    // Adding segment should be done by a single HelixTaskExecutor thread, but 
do it with lock here for simplicity
-    // otherwise, we'd need to double-check if oldSegmentManager is null.
-    Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
-      if (oldSegmentManager == null) {
-        // When adding a new segment, we should register it 'before' it is 
fully initialized by
-        // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
-        // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
-        // access the new segment asap even though its validDocId bitmap is 
still being filled by
-        // partitionUpsertMetadataManager.
-        registerSegment(segmentName, newSegmentManager);
-        partitionUpsertMetadataManager.addSegment(immutableSegment);
-        _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
-      } else {
-        // When replacing a segment, we should register the new segment 
'after' it is fully initialized by
-        // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
-        // to the valid docs in the old segment immediately, but the 
validDocId bitmap of the new segment is still
-        // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
-        // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
-        // consistency, otherwise the new segment should be named differently 
to go through the addSegment flow above.
-        IndexSegment oldSegment = oldSegmentManager.getSegment();
-        partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-        registerSegment(segmentName, newSegmentManager);
-        _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
-            oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
-        releaseSegment(oldSegmentManager);
-      }
-    } finally {
-      segmentLock.unlock();
-    }
-  }
-
-  @Override
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    // Cannot download consuming segment
-    if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
-      return false;
-    }
-    // TODO: may support download from peer servers as well.
-    return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
-  }
-
-  void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata 
segmentZKMetadata,
-      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
-    String uri = segmentZKMetadata.getDownloadUrl();
-    if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
-      try {
-        // TODO: cleanup and consolidate the segment loading logic a bit for 
OFFLINE and REALTIME tables.
-        //       https://github.com/apache/pinot/issues/9752
-        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
-      } catch (Exception e) {
-        _logger.warn("Download segment {} from deepstore uri {} failed.", 
segmentName, uri, e);
-        // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
-        if (_peerDownloadScheme != null) {
-          downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-        } else {
-          throw e;
-        }
-      }
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager == null) {
+      // When adding a new segment, we should register it 'before' it is fully 
initialized by
+      // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
+      // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
+      // access the new segment asap even though its validDocId bitmap is 
still being filled by
+      // partitionUpsertMetadataManager.
+      registerSegment(segmentName, newSegmentManager);
+      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      _logger.info("Added new immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
     } else {
-      if (_peerDownloadScheme != null) {
-        downloadSegmentFromPeer(segmentName, indexLoadingConfig);
-      } else {
-        throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
-      }
-    }
-  }
-
-  private void downloadSegmentFromDeepStore(String segmentName, 
IndexLoadingConfig indexLoadingConfig, String uri) {
-    // This could leave temporary directories in _indexDir if JVM shuts down 
before the temp directory is deleted.
-    // This is fine since the temporary directories are deleted when the table 
data manager calls init.
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
-      _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Failed to download segment {} from deep store: ", 
segmentName, e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
+      // When replacing a segment, we should register the new segment 'after' 
it is fully initialized by
+      // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
+      // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
+      // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
+      // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
+      // consistency, otherwise the new segment should be named differently to 
go through the addSegment flow above.
+      IndexSegment oldSegment = oldSegmentManager.getSegment();
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager);
+      _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, _tableNameWithType);
+      releaseSegment(oldSegmentManager);
     }
   }
 
   /**
-   * Untars the new segment and replaces the existing segment.
+   * Replaces the CONSUMING segment with a downloaded sealed one.
    */
-  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, File segmentTarFile,
-      File tempRootDir)
-      throws IOException {
-    File untarDir = new File(tempRootDir, segmentName);
-    File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, 
untarDir).get(0);
-    _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, 
untarDir);
-    File indexDir = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(indexDir);
-    FileUtils.moveDirectory(untaredSegDir, indexDir);
-    _logger.info("Replacing LLC Segment {}", segmentName);
-    replaceLLSegment(segmentName, indexLoadingConfig);
-  }
-
-  private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
-    File tempRootDir = null;
-    try {
-      tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
-      File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      // Next download the segment from a randomly chosen server using 
configured download scheme (http or https).
-      
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
 () -> {
-        List<URI> peerServerURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName,
-                _peerDownloadScheme);
-        Collections.shuffle(peerServerURIs);
-        return peerServerURIs;
-      }, segmentTarFile);
-      _logger.info("Fetched segment {} successfully to {} of size {}", 
segmentName, segmentTarFile,
-          segmentTarFile.length());
-      untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
-    } catch (Exception e) {
-      _logger.warn("Download and move segment {} from peer with scheme {} 
failed.", segmentName, _peerDownloadScheme,
-          e);
-      throw new RuntimeException(e);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
-    }
+  public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+      throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
+    _logger.info("Downloading and replacing CONSUMING segment: {} with sealed 
one", segmentName);

Review Comment:
   s/sealed/committed one to be a bit more precise?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to