klsince commented on code in PR #12886: URL: https://github.com/apache/pinot/pull/12886#discussion_r1569585817
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -268,64 +282,192 @@ public void addSegment(ImmutableSegment immutableSegment) { String segmentName = immutableSegment.getSegmentName(); Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", segmentName, _tableNameWithType); - _logger.info("Adding immutable segment: {} to table: {}", segmentName, _tableNameWithType); + _logger.info("Adding immutable segment: {}", segmentName); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, immutableSegment.getSegmentMetadata().getTotalDocs()); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment); SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager); if (oldSegmentManager == null) { - _logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType); + _logger.info("Added new immutable segment: {}", segmentName); } else { - _logger.info("Replaced immutable segment: {} of table: {}", segmentName, _tableNameWithType); + _logger.info("Replaced immutable segment: {}", segmentName); releaseSegment(oldSegmentManager); } } @Override - public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) + public void addOnlineSegment(String segmentName) throws Exception { - Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", - indexDir.getName(), _tableNameWithType); + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot add ONLINE segment: %s to table: %s", segmentName, + _tableNameWithType); + _logger.info("Adding ONLINE segment: {} to table: {}", segmentName, _tableNameWithType); Review Comment: as the other places, no need to log `_tableNameWithType`? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String segmentName) { } } - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - return true; - } - - protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata) - throws Exception { - // TODO: may support download from peer servers for RealTime table. - return downloadSegmentFromDeepStore(segmentName, zkMetadata); - } - - private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata) + /** + * Downloads an immutable segment into the index directory. + * Segment can be downloaded from deep store or from peer servers. Downloaded segment might be compressed or + * encrypted, and this method takes care of decompressing and decrypting the segment. + */ + protected File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); - if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { - try { - File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir, - _streamSegmentDownloadUntarRateLimitBytesPerSec); - return moveSegment(segmentName, untaredSegDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } else { - try { - File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir); - return untarAndMoveSegment(segmentName, tarFile, tempRootDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } - } - - private File moveSegment(String segmentName, File untaredSegDir) - throws IOException { + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + Preconditions.checkState(downloadUrl != null, + "Failed to find download URL in ZK metadata for segment: %s of table: %s", segmentName, _tableNameWithType); try { - File indexDir = getSegmentDataDir(segmentName); - FileUtils.deleteDirectory(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - return indexDir; + if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) { + try { + return downloadSegmentFromDeepStore(zkMetadata); + } catch (Exception e) { + if (_peerDownloadScheme != null) { Review Comment: add a log that this is to retry the failure of downloading from deep store nit: to save some indents ``` if (is peer downloand) { return downloadSegmentFromPeers(zkMetadata); } ... ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment immutableSegment) { _logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); return; } - // Replacing segment takes multiple steps, and particularly need to access the oldSegment. Replace segment may - // happen in two threads, i.e. the consuming thread that's committing the mutable segment and a HelixTaskExecutor - // thread that's bringing segment from ONLINE to CONSUMING when the server finds consuming thread can't commit - // the segment in time. The slower thread takes the reference of the oldSegment here, but it may get closed by - // the faster thread if not synchronized. In particular, the slower thread may iterate the primary keys in the - // oldSegment, causing seg fault. So we have to take a lock here. - // However, we can't just reuse the existing segmentLocks. Because many methods of partitionUpsertMetadataManager - // takes this lock internally, but after taking snapshot RW lock. If we take segmentLock here (before taking - // snapshot RW lock), we can get into deadlock with threads calling partitionUpsertMetadataManager's other - // methods, like removeSegment. - // Adding segment should be done by a single HelixTaskExecutor thread, but do it with lock here for simplicity - // otherwise, we'd need to double-check if oldSegmentManager is null. - Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); - if (oldSegmentManager == null) { - // When adding a new segment, we should register it 'before' it is fully initialized by - // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other - // segments may be invalidated, making the queries see less valid docs than expected. We should let query - // access the new segment asap even though its validDocId bitmap is still being filled by - // partitionUpsertMetadataManager. - registerSegment(segmentName, newSegmentManager); - partitionUpsertMetadataManager.addSegment(immutableSegment); - _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); - } else { - // When replacing a segment, we should register the new segment 'after' it is fully initialized by - // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access - // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still - // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. - // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data - // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. - IndexSegment oldSegment = oldSegmentManager.getSegment(); - partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); - registerSegment(segmentName, newSegmentManager); - _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", - oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); - releaseSegment(oldSegmentManager); - } - } finally { - segmentLock.unlock(); - } - } - - @Override - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - // Cannot download consuming segment - if (zkMetadata.getStatus() == Status.IN_PROGRESS) { - return false; - } - // TODO: may support download from peer servers as well. - return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl()); - } - - void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata segmentZKMetadata, - IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { - String uri = segmentZKMetadata.getDownloadUrl(); - if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) { - try { - // TODO: cleanup and consolidate the segment loading logic a bit for OFFLINE and REALTIME tables. - // https://github.com/apache/pinot/issues/9752 - downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); - } catch (Exception e) { - _logger.warn("Download segment {} from deepstore uri {} failed.", segmentName, uri, e); - // Download from deep store failed; try to download from peer if peer download is setup for the table. - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw e; - } - } + SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); + if (oldSegmentManager == null) { + // When adding a new segment, we should register it 'before' it is fully initialized by + // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other + // segments may be invalidated, making the queries see less valid docs than expected. We should let query + // access the new segment asap even though its validDocId bitmap is still being filled by + // partitionUpsertMetadataManager. + registerSegment(segmentName, newSegmentManager); + partitionUpsertMetadataManager.addSegment(immutableSegment); + _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); } else { - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw new RuntimeException("Peer segment download not enabled for segment " + segmentName); - } - } - } - - private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) { - // This could leave temporary directories in _indexDir if JVM shuts down before the temp directory is deleted. - // This is fine since the temporary directories are deleted when the table data manager calls init. - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile); - _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Failed to download segment {} from deep store: ", segmentName, e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); + // When replacing a segment, we should register the new segment 'after' it is fully initialized by + // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access + // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still + // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. + // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data + // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. + IndexSegment oldSegment = oldSegmentManager.getSegment(); + partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); + registerSegment(segmentName, newSegmentManager); + _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", + oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); + releaseSegment(oldSegmentManager); } } /** - * Untars the new segment and replaces the existing segment. + * Replaces the CONSUMING segment with a downloaded sealed one. */ - private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File segmentTarFile, - File tempRootDir) - throws IOException { - File untarDir = new File(tempRootDir, segmentName); - File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, untarDir).get(0); - _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, untarDir); - File indexDir = new File(_indexDir, segmentName); - FileUtils.deleteQuietly(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - _logger.info("Replacing LLC Segment {}", segmentName); - replaceLLSegment(segmentName, indexLoadingConfig); - } - - private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig indexLoadingConfig) { - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - // Next download the segment from a randomly chosen server using configured download scheme (http or https). - SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName, () -> { - List<URI> peerServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, - _peerDownloadScheme); - Collections.shuffle(peerServerURIs); - return peerServerURIs; - }, segmentTarFile); - _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, _peerDownloadScheme, - e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } + public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata) + throws Exception { + String segmentName = zkMetadata.getSegmentName(); + _logger.info("Downloading and replacing CONSUMING segment: {} with sealed one", segmentName); + File indexDir = downloadSegment(zkMetadata); + // Get a new index loading config with latest table config and schema to load the segment + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null); Review Comment: why not pass zkMetadata into this method? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -428,17 +570,25 @@ public ExecutorService getSegmentPreloadExecutor() { @Override public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInfo) { - _errorCache.put(Pair.of(_tableNameWithType, segmentName), segmentErrorInfo); + if (_errorCache != null) { + _errorCache.put(Pair.of(_tableNameWithType, segmentName), segmentErrorInfo); + } } @Override public Map<String, SegmentErrorInfo> getSegmentErrors() { - if (_errorCache == null) { - return Collections.emptyMap(); + if (_errorCache != null) { + // Filter out entries that match the table name + Map<String, SegmentErrorInfo> segmentErrors = new HashMap<>(); + for (Map.Entry<Pair<String, String>, SegmentErrorInfo> entry : _errorCache.asMap().entrySet()) { + Pair<String, String> tableSegmentPair = entry.getKey(); + if (tableSegmentPair.getLeft().equals(_tableNameWithType)) { + segmentErrors.put(tableSegmentPair.getRight(), entry.getValue()); + } + } + return segmentErrors; } else { - // Filter out entries that match the table name. - return _errorCache.asMap().entrySet().stream().filter(map -> map.getKey().getLeft().equals(_tableNameWithType)) - .collect(Collectors.toMap(map -> map.getKey().getRight(), Map.Entry::getValue)); + return Map.of(); Review Comment: nit: why not continue to use Collections.emptyMap()? I didn't see use of Map.of() else where in the repo, iirc it used to cause compile issue in older JDK, 8? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -268,64 +282,192 @@ public void addSegment(ImmutableSegment immutableSegment) { String segmentName = immutableSegment.getSegmentName(); Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", segmentName, _tableNameWithType); - _logger.info("Adding immutable segment: {} to table: {}", segmentName, _tableNameWithType); + _logger.info("Adding immutable segment: {}", segmentName); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, immutableSegment.getSegmentMetadata().getTotalDocs()); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment); SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager); if (oldSegmentManager == null) { - _logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType); + _logger.info("Added new immutable segment: {}", segmentName); } else { - _logger.info("Replaced immutable segment: {} of table: {}", segmentName, _tableNameWithType); + _logger.info("Replaced immutable segment: {}", segmentName); releaseSegment(oldSegmentManager); } } @Override - public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) + public void addOnlineSegment(String segmentName) throws Exception { - Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", - indexDir.getName(), _tableNameWithType); + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot add ONLINE segment: %s to table: %s", segmentName, + _tableNameWithType); + _logger.info("Adding ONLINE segment: {} to table: {}", segmentName, _tableNameWithType); + Lock segmentLock = getSegmentLock(segmentName); + segmentLock.lock(); + try { + doAddOnlineSegment(segmentName); + } catch (Exception e) { + addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while adding ONLINE segment", e)); + throw e; + } finally { + segmentLock.unlock(); + } + } + + protected abstract void doAddOnlineSegment(String segmentName) + throws Exception; + + protected SegmentZKMetadata getZKMetadata(String segmentName) { + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, _tableNameWithType, segmentName); + Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s of table: %s", segmentName, + _tableNameWithType); + return zkMetadata; + } + + protected TableConfig getTableConfig() { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); + return tableConfig; + } + + @Nullable + protected Schema getSchema(TableConfig tableConfig) { + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); + // NOTE: Schema is mandatory for REALTIME table. + if (tableConfig.getTableType() == TableType.REALTIME) { + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); + } + return schema; + } + + protected IndexLoadingConfig getIndexLoadingConfig(@Nullable SegmentZKMetadata zkMetadata) { + TableConfig tableConfig = getTableConfig(); + Schema schema = getSchema(tableConfig); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); indexLoadingConfig.setTableDataDir(_tableDataDir); indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs()); - addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema())); + if (zkMetadata != null) { + indexLoadingConfig.setSegmentTier(zkMetadata.getTier()); + } + return indexLoadingConfig; } - @Override - public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata) + /** + * Adds a new ONLINE segment that is not already loaded. + */ + protected void addNewOnlineSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) + throws Exception { + String segmentName = zkMetadata.getSegmentName(); + _logger.info("Adding new ONLINE segment: {}", segmentName); + if (!tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata)) { + downloadAndLoadSegment(zkMetadata, indexLoadingConfig); + } + } + + /** + * Replaces an already loaded segment in a table if the segment has been overridden in the deep store (CRC mismatch). + */ + protected void replaceSegmentIfCrcMismatch(SegmentDataManager segmentDataManager, SegmentZKMetadata zkMetadata, + IndexLoadingConfig indexLoadingConfig) throws Exception { - throw new UnsupportedOperationException(); + String segmentName = segmentDataManager.getSegmentName(); + Preconditions.checkState(segmentDataManager instanceof ImmutableSegmentDataManager, + "Cannot replace CONSUMING segment: %s in table: %s", segmentName, _tableNameWithType); + SegmentMetadata localMetadata = segmentDataManager.getSegment().getSegmentMetadata(); + if (hasSameCRC(zkMetadata, localMetadata)) { + _logger.info("Segment: {} has CRC: {} same as before, not replacing it", segmentName, localMetadata.getCrc()); + return; + } + _logger.info("Replacing segment: {} because its CRC has changed from: {} to: {}", segmentName, + localMetadata.getCrc(), zkMetadata.getCrc()); + downloadAndLoadSegment(zkMetadata, indexLoadingConfig); + _logger.info("Replaced segment: {} with new CRC: {}", segmentName, zkMetadata.getCrc()); } /** - * Called when we get a helix transition to go to offline or dropped state. - * We need to remove it safely, keeping in mind that there may be queries that are - * using the segment, - * @param segmentName name of the segment to remove. + * Downloads a segment and loads it into the table. */ + protected void downloadAndLoadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) + throws Exception { + String segmentName = zkMetadata.getSegmentName(); + _logger.info("Downloading and loading segment: {}", segmentName); + File indexDir = downloadSegment(zkMetadata); + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig)); + _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), + TierConfigUtils.normalizeTierName(zkMetadata.getTier())); + } + @Override - public void removeSegment(String segmentName) { - // Allow removing segment after shutdown so that we can remove the segment when the table is deleted + public void replaceSegment(String segmentName) + throws Exception { + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot replace segment: %s in table: %s", segmentName, + _tableNameWithType); + _logger.info("Replacing segment: {}", segmentName); + Lock segmentLock = getSegmentLock(segmentName); + segmentLock.lock(); + try { + doReplaceSegment(segmentName); + } catch (Exception e) { + addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while replacing segment", e)); + throw e; + } finally { + segmentLock.unlock(); + } + } + + protected void doReplaceSegment(String segmentName) + throws Exception { + SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); + if (segmentDataManager != null) { + SegmentZKMetadata zkMetadata = getZKMetadata(segmentName); + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata); + replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig); + } else { + _logger.warn("Failed to find segment: {}, skipping replacing it", segmentName); + } + } + + @Override + public void offloadSegment(String segmentName) { + // NOTE: Do not throw exception when data manager has been shut down. This is regular flow when a table is deleted. if (_shutDown) { - _logger.info("Table data manager is already shut down, skip removing segment: {} from table: {}", segmentName, - _tableNameWithType); + _logger.info("Table data manager is already shut down, skipping offloading segment: {}", segmentName); return; } - _logger.info("Removing segment: {} from table: {}", segmentName, _tableNameWithType); + _logger.info("Offloading segment: {}", segmentName); + Lock segmentLock = getSegmentLock(segmentName); + segmentLock.lock(); + try { + doOffloadSegment(segmentName); + } catch (Exception e) { + addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while offloading segment", e)); + throw e; + } finally { + segmentLock.unlock(); + } + } + + protected void doOffloadSegment(String segmentName) { SegmentDataManager segmentDataManager = unregisterSegment(segmentName); if (segmentDataManager != null) { + segmentDataManager.offload(); releaseSegment(segmentDataManager); - _logger.info("Removed segment: {} from table: {}", segmentName, _tableNameWithType); + _logger.info("Offloaded segment: {}", segmentName); } else { - _logger.info("Failed to find segment: {} in table: {}", segmentName, _tableNameWithType); + _logger.warn("Failed to find segment: {}, skipping offloading it", _tableNameWithType); Review Comment: s/_tableNameWithType/segmentName ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String segmentName) { } } - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - return true; - } - - protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata) - throws Exception { - // TODO: may support download from peer servers for RealTime table. - return downloadSegmentFromDeepStore(segmentName, zkMetadata); - } - - private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata) + /** + * Downloads an immutable segment into the index directory. + * Segment can be downloaded from deep store or from peer servers. Downloaded segment might be compressed or + * encrypted, and this method takes care of decompressing and decrypting the segment. + */ + protected File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); - if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { - try { - File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir, - _streamSegmentDownloadUntarRateLimitBytesPerSec); - return moveSegment(segmentName, untaredSegDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } else { - try { - File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir); - return untarAndMoveSegment(segmentName, tarFile, tempRootDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } - } - - private File moveSegment(String segmentName, File untaredSegDir) - throws IOException { + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + Preconditions.checkState(downloadUrl != null, + "Failed to find download URL in ZK metadata for segment: %s of table: %s", segmentName, _tableNameWithType); try { - File indexDir = getSegmentDataDir(segmentName); - FileUtils.deleteDirectory(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - return indexDir; + if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) { + try { + return downloadSegmentFromDeepStore(zkMetadata); + } catch (Exception e) { + if (_peerDownloadScheme != null) { + return downloadSegmentFromPeers(zkMetadata); + } else { + throw e; + } + } + } else { + return downloadSegmentFromPeers(zkMetadata); + } } catch (Exception e) { - LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1); throw e; } } @VisibleForTesting - File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir) + File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) throws Exception { - File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - String uri = zkMetadata.getDownloadUrl(); - boolean downloadSuccess = false; + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl); + File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); + if (_segmentDownloadSemaphore != null) { + long startTime = System.currentTimeMillis(); + _logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName, + _segmentDownloadSemaphore.getQueueLength()); + _segmentDownloadSemaphore.acquire(); + _logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).", + segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); + } try { - if (_segmentDownloadSemaphore != null) { - long startTime = System.currentTimeMillis(); - LOGGER.info("Trying to acquire segment download semaphore for: {}. queue-length: {} ", segmentName, - _segmentDownloadSemaphore.getQueueLength()); - _segmentDownloadSemaphore.acquire(); - LOGGER.info("Acquired segment download semaphore for: {} (lock-time={}ms, queue-length={}).", segmentName, - System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); - } - SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName()); - LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, - _tableNameWithType, uri, tarFile, tarFile.length()); - downloadSuccess = true; - return tarFile; - } catch (AttemptsExceededException e) { - LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, - _tableNameWithType, uri, tarFile); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L); - if (_peerDownloadScheme == null) { - throw e; + File untarredSegmentDir; + if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { + _logger.info("Downloading segment: {} using streamed download-untar with maxStreamRateInByte: {}", segmentName, + _streamSegmentDownloadUntarRateLimitBytesPerSec); + AtomicInteger attempts = new AtomicInteger(0); + try { + untarredSegmentDir = SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir, + _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts); + _logger.info("Downloaded and untarred segment: {} from: {}, attempts: {}", segmentName, downloadUrl, + attempts.get()); + } finally { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, Review Comment: not always failures? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String segmentName) { } } - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - return true; - } - - protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata) - throws Exception { - // TODO: may support download from peer servers for RealTime table. - return downloadSegmentFromDeepStore(segmentName, zkMetadata); - } - - private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata) + /** + * Downloads an immutable segment into the index directory. + * Segment can be downloaded from deep store or from peer servers. Downloaded segment might be compressed or + * encrypted, and this method takes care of decompressing and decrypting the segment. + */ + protected File downloadSegment(SegmentZKMetadata zkMetadata) throws Exception { - File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); - if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { - try { - File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir, - _streamSegmentDownloadUntarRateLimitBytesPerSec); - return moveSegment(segmentName, untaredSegDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } else { - try { - File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir); - return untarAndMoveSegment(segmentName, tarFile, tempRootDir); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } - } - } - - private File moveSegment(String segmentName, File untaredSegDir) - throws IOException { + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + Preconditions.checkState(downloadUrl != null, + "Failed to find download URL in ZK metadata for segment: %s of table: %s", segmentName, _tableNameWithType); try { - File indexDir = getSegmentDataDir(segmentName); - FileUtils.deleteDirectory(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - return indexDir; + if (!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) { + try { + return downloadSegmentFromDeepStore(zkMetadata); + } catch (Exception e) { + if (_peerDownloadScheme != null) { + return downloadSegmentFromPeers(zkMetadata); + } else { + throw e; + } + } + } else { + return downloadSegmentFromPeers(zkMetadata); + } } catch (Exception e) { - LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1); throw e; } } @VisibleForTesting - File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir) + File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) throws Exception { - File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - String uri = zkMetadata.getDownloadUrl(); - boolean downloadSuccess = false; + String segmentName = zkMetadata.getSegmentName(); + String downloadUrl = zkMetadata.getDownloadUrl(); + _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl); + File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); + if (_segmentDownloadSemaphore != null) { + long startTime = System.currentTimeMillis(); + _logger.info("Acquiring segment download semaphore for segment: {}, queue-length: {} ", segmentName, + _segmentDownloadSemaphore.getQueueLength()); + _segmentDownloadSemaphore.acquire(); + _logger.info("Acquired segment download semaphore for segment: {} (lock-time={}ms, queue-length={}).", + segmentName, System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); + } try { - if (_segmentDownloadSemaphore != null) { - long startTime = System.currentTimeMillis(); - LOGGER.info("Trying to acquire segment download semaphore for: {}. queue-length: {} ", segmentName, - _segmentDownloadSemaphore.getQueueLength()); - _segmentDownloadSemaphore.acquire(); - LOGGER.info("Acquired segment download semaphore for: {} (lock-time={}ms, queue-length={}).", segmentName, - System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); - } - SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName()); - LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, - _tableNameWithType, uri, tarFile, tarFile.length()); - downloadSuccess = true; - return tarFile; - } catch (AttemptsExceededException e) { - LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, - _tableNameWithType, uri, tarFile); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L); - if (_peerDownloadScheme == null) { - throw e; + File untarredSegmentDir; + if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { + _logger.info("Downloading segment: {} using streamed download-untar with maxStreamRateInByte: {}", segmentName, + _streamSegmentDownloadUntarRateLimitBytesPerSec); + AtomicInteger attempts = new AtomicInteger(0); + try { + untarredSegmentDir = SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir, + _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts); + _logger.info("Downloaded and untarred segment: {} from: {}, attempts: {}", segmentName, downloadUrl, + attempts.get()); + } finally { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, + attempts.get()); + } + } else { + File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName()); + _logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl, + segmentTarFile, segmentTarFile.length()); + untarredSegmentDir = untarSegment(segmentName, segmentTarFile, tempRootDir); } - downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile); - downloadSuccess = true; - return tarFile; + File indexDir = moveSegment(segmentName, untarredSegmentDir); + _logger.info("Downloaded segment: {} from: {} to: {}", segmentName, downloadUrl, indexDir); + return indexDir; + } catch (Exception e) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1); + throw e; } finally { - if (!downloadSuccess) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L); - } if (_segmentDownloadSemaphore != null) { _segmentDownloadSemaphore.release(); } + FileUtils.deleteQuietly(tempRootDir); } } - protected void downloadFromPeersWithoutStreaming(String segmentName, SegmentZKMetadata zkMetadata, File destTarFile) + @VisibleForTesting + File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata) throws Exception { + String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(_peerDownloadScheme != null, "Peer download is not enabled for table: %s", _tableNameWithType); + _logger.info("Downloading segment: {} from peers", segmentName); + File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); Review Comment: pull `File tmepRootDir...` and `FileUtils.deleteQuietly(tempRootDir);` up into the caller `downloadSegment()` to save those few lines from downloadeFromDeepstore and xxxfromPeers ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -752,14 +752,20 @@ public void run() { _state = State.DISCARDED; break; case DEFAULT: - success = buildSegmentAndReplace(); - if (success) { - _state = State.RETAINED; - } else { - // Could not build segment for some reason. We can only download it. - _state = State.ERROR; - _segmentLogger.error("Could not build segment for {}", _segmentNameStr); + // Lock the segment to avoid multiple threads touching the same segment. + Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr); + segmentLock.lock(); + try { + if (buildSegmentAndReplace()) { Review Comment: move lock/unlock into buildSegmentAndReplace() method? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment immutableSegment) { _logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); return; } - // Replacing segment takes multiple steps, and particularly need to access the oldSegment. Replace segment may - // happen in two threads, i.e. the consuming thread that's committing the mutable segment and a HelixTaskExecutor - // thread that's bringing segment from ONLINE to CONSUMING when the server finds consuming thread can't commit - // the segment in time. The slower thread takes the reference of the oldSegment here, but it may get closed by - // the faster thread if not synchronized. In particular, the slower thread may iterate the primary keys in the - // oldSegment, causing seg fault. So we have to take a lock here. - // However, we can't just reuse the existing segmentLocks. Because many methods of partitionUpsertMetadataManager - // takes this lock internally, but after taking snapshot RW lock. If we take segmentLock here (before taking - // snapshot RW lock), we can get into deadlock with threads calling partitionUpsertMetadataManager's other - // methods, like removeSegment. - // Adding segment should be done by a single HelixTaskExecutor thread, but do it with lock here for simplicity - // otherwise, we'd need to double-check if oldSegmentManager is null. - Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); - if (oldSegmentManager == null) { - // When adding a new segment, we should register it 'before' it is fully initialized by - // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other - // segments may be invalidated, making the queries see less valid docs than expected. We should let query - // access the new segment asap even though its validDocId bitmap is still being filled by - // partitionUpsertMetadataManager. - registerSegment(segmentName, newSegmentManager); - partitionUpsertMetadataManager.addSegment(immutableSegment); - _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); - } else { - // When replacing a segment, we should register the new segment 'after' it is fully initialized by - // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access - // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still - // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. - // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data - // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. - IndexSegment oldSegment = oldSegmentManager.getSegment(); - partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); - registerSegment(segmentName, newSegmentManager); - _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", - oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); - releaseSegment(oldSegmentManager); - } - } finally { - segmentLock.unlock(); - } - } - - @Override - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - // Cannot download consuming segment - if (zkMetadata.getStatus() == Status.IN_PROGRESS) { - return false; - } - // TODO: may support download from peer servers as well. - return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl()); - } - - void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata segmentZKMetadata, - IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { - String uri = segmentZKMetadata.getDownloadUrl(); - if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) { - try { - // TODO: cleanup and consolidate the segment loading logic a bit for OFFLINE and REALTIME tables. - // https://github.com/apache/pinot/issues/9752 - downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); - } catch (Exception e) { - _logger.warn("Download segment {} from deepstore uri {} failed.", segmentName, uri, e); - // Download from deep store failed; try to download from peer if peer download is setup for the table. - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw e; - } - } + SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); + if (oldSegmentManager == null) { + // When adding a new segment, we should register it 'before' it is fully initialized by + // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other + // segments may be invalidated, making the queries see less valid docs than expected. We should let query + // access the new segment asap even though its validDocId bitmap is still being filled by + // partitionUpsertMetadataManager. + registerSegment(segmentName, newSegmentManager); + partitionUpsertMetadataManager.addSegment(immutableSegment); + _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); } else { - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw new RuntimeException("Peer segment download not enabled for segment " + segmentName); - } - } - } - - private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) { - // This could leave temporary directories in _indexDir if JVM shuts down before the temp directory is deleted. - // This is fine since the temporary directories are deleted when the table data manager calls init. - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile); - _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Failed to download segment {} from deep store: ", segmentName, e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); + // When replacing a segment, we should register the new segment 'after' it is fully initialized by + // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access + // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still + // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. + // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data + // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. + IndexSegment oldSegment = oldSegmentManager.getSegment(); + partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); + registerSegment(segmentName, newSegmentManager); + _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", + oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); + releaseSegment(oldSegmentManager); } } /** - * Untars the new segment and replaces the existing segment. + * Replaces the CONSUMING segment with a downloaded sealed one. */ - private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File segmentTarFile, - File tempRootDir) - throws IOException { - File untarDir = new File(tempRootDir, segmentName); - File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, untarDir).get(0); - _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, untarDir); - File indexDir = new File(_indexDir, segmentName); - FileUtils.deleteQuietly(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - _logger.info("Replacing LLC Segment {}", segmentName); - replaceLLSegment(segmentName, indexLoadingConfig); - } - - private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig indexLoadingConfig) { - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - // Next download the segment from a randomly chosen server using configured download scheme (http or https). - SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName, () -> { - List<URI> peerServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, - _peerDownloadScheme); - Collections.shuffle(peerServerURIs); - return peerServerURIs; - }, segmentTarFile); - _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, _peerDownloadScheme, - e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } + public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata) + throws Exception { + String segmentName = zkMetadata.getSegmentName(); + _logger.info("Downloading and replacing CONSUMING segment: {} with sealed one", segmentName); + File indexDir = downloadSegment(zkMetadata); + // Get a new index loading config with latest table config and schema to load the segment + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null); + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig)); + _logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName); } /** - * Replaces a committed LLC REALTIME segment. + * Replaces the CONSUMING segment with the sealed one. Review Comment: nit:`... with the one sealed locally` to be more precise? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -377,111 +369,112 @@ private boolean isUpsertPreloadEnabled() { && upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload(); } - /* - * This call comes in one of two ways: - * For HL Segments: - * - We are being directed by helix to own up all the segments that we committed and are still in retention. In - * this case we treat it exactly like how OfflineTableDataManager would -- wrap it into an - * OfflineSegmentDataManager, and put it in the map. - * - We are being asked to own up a new realtime segment. In this case, we wrap the segment with a - * RealTimeSegmentDataManager (that kicks off consumption). When the segment is committed we get notified via the - * notifySegmentCommitted call, at which time we replace the segment with the OfflineSegmentDataManager - * - * For LL Segments: - * - We are being asked to start consuming from a partition. - * - We did not know about the segment and are being asked to download and own the segment (re-balancing, or - * replacing a realtime server with a fresh one, maybe). We need to look at segment metadata and decide whether - * to start consuming or download the segment. + /** + * Handles upsert preload, and returns whether the upsert preload is enabled. */ - @Override - public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) - throws Exception { - Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", - segmentName, _tableNameWithType); - boolean upsertPreloadEnabled = isUpsertPreloadEnabled(); - if (upsertPreloadEnabled) { - Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, null); - Preconditions.checkNotNull(partitionId, - String.format("Failed to get partition id for segment: %s (upsert-enabled table: %s)", segmentName, - _tableNameWithType)); - PartitionUpsertMetadataManager partitionUpsertMetadataManager = - _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId); - partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig); - // Continue to add segment after preloading, as the segment might not be added by preloading. + private void handleUpsertPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) { + if (!isUpsertPreloadEnabled()) { + return; } + String segmentName = zkMetadata.getSegmentName(); + Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null); + Preconditions.checkState(partitionId != null, + String.format("Failed to get partition id for segment: %s in upsert-enabled table: %s", segmentName, + _tableNameWithType)); + _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig); + } + + protected void doAddOnlineSegment(String segmentName) + throws Exception { + SegmentZKMetadata zkMetadata = getZKMetadata(segmentName); + Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, + "Segment: %s of table: %s is not committed, cannot make it ONLINE", segmentName, _tableNameWithType); + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata); + handleUpsertPreload(zkMetadata, indexLoadingConfig); SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); - if (segmentDataManager != null) { - if (upsertPreloadEnabled) { - _logger.debug( - "Skipping adding existing segment: {} for table: {} with data manager class: {}, as it's preloaded", - segmentName, _tableNameWithType, segmentDataManager.getClass().getSimpleName()); + if (segmentDataManager == null) { + addNewOnlineSegment(zkMetadata, indexLoadingConfig); + } else { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + _logger.info("Changing segment: {} from CONSUMING to ONLINE", segmentName); + ((RealtimeSegmentDataManager) segmentDataManager).goOnlineFromConsuming(zkMetadata); + onConsumingToOnline(segmentName); } else { - _logger.warn("Skipping adding existing segment: {} for table: {} with data manager class: {}", segmentName, - _tableNameWithType, segmentDataManager.getClass().getSimpleName()); + replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig); } + } + } + + @Override + public void addConsumingSegment(String segmentName) { + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot add CONSUMING segment: %s to table: %s", segmentName, + _tableNameWithType); + _logger.info("Adding CONSUMING segment: {}", segmentName); + Lock segmentLock = getSegmentLock(segmentName); + segmentLock.lock(); + try { + doAddConsumingSegment(segmentName); + } catch (Exception e) { + addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while adding CONSUMING segment", e)); + throw e; + } finally { + segmentLock.unlock(); + } + } + + private void doAddConsumingSegment(String segmentName) { + SegmentZKMetadata zkMetadata = getZKMetadata(segmentName); + // NOTE: We do not throw exception here because the segment might have just been committed before the state + // transition is processed. We can skip adding this segment, and rely on the following CONSUMING -> ONLINE + // state transition to add it. Review Comment: how about add this to the comment that: We can skip ..., and the segment will enter 'CONSUMING' state, and we can rely on ... ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java: ########## @@ -76,22 +79,52 @@ void init(InstanceDataManagerConfig instanceDataManagerConfig, TableConfig table boolean isShutDown(); + /** + * Returns the segment lock for a segment in the table. + */ + Lock getSegmentLock(String segmentName); + + /** + * Returns whether the segment is loaded in the table. + */ + boolean hasSegment(String segmentName); + /** * Adds a loaded immutable segment into the table. */ + @VisibleForTesting void addSegment(ImmutableSegment immutableSegment); Review Comment: how about move this into the base class as it's more of an internal helper method now ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -377,111 +369,112 @@ private boolean isUpsertPreloadEnabled() { && upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload(); } - /* - * This call comes in one of two ways: - * For HL Segments: - * - We are being directed by helix to own up all the segments that we committed and are still in retention. In - * this case we treat it exactly like how OfflineTableDataManager would -- wrap it into an - * OfflineSegmentDataManager, and put it in the map. - * - We are being asked to own up a new realtime segment. In this case, we wrap the segment with a - * RealTimeSegmentDataManager (that kicks off consumption). When the segment is committed we get notified via the - * notifySegmentCommitted call, at which time we replace the segment with the OfflineSegmentDataManager - * - * For LL Segments: - * - We are being asked to start consuming from a partition. - * - We did not know about the segment and are being asked to download and own the segment (re-balancing, or - * replacing a realtime server with a fresh one, maybe). We need to look at segment metadata and decide whether - * to start consuming or download the segment. + /** + * Handles upsert preload, and returns whether the upsert preload is enabled. */ - @Override - public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) - throws Exception { - Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", - segmentName, _tableNameWithType); - boolean upsertPreloadEnabled = isUpsertPreloadEnabled(); - if (upsertPreloadEnabled) { - Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, null); - Preconditions.checkNotNull(partitionId, - String.format("Failed to get partition id for segment: %s (upsert-enabled table: %s)", segmentName, - _tableNameWithType)); - PartitionUpsertMetadataManager partitionUpsertMetadataManager = - _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId); - partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig); - // Continue to add segment after preloading, as the segment might not be added by preloading. + private void handleUpsertPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) { + if (!isUpsertPreloadEnabled()) { + return; } + String segmentName = zkMetadata.getSegmentName(); + Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null); + Preconditions.checkState(partitionId != null, + String.format("Failed to get partition id for segment: %s in upsert-enabled table: %s", segmentName, + _tableNameWithType)); + _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig); + } + + protected void doAddOnlineSegment(String segmentName) + throws Exception { + SegmentZKMetadata zkMetadata = getZKMetadata(segmentName); + Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS, + "Segment: %s of table: %s is not committed, cannot make it ONLINE", segmentName, _tableNameWithType); + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata); + handleUpsertPreload(zkMetadata, indexLoadingConfig); SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); - if (segmentDataManager != null) { - if (upsertPreloadEnabled) { - _logger.debug( - "Skipping adding existing segment: {} for table: {} with data manager class: {}, as it's preloaded", - segmentName, _tableNameWithType, segmentDataManager.getClass().getSimpleName()); + if (segmentDataManager == null) { + addNewOnlineSegment(zkMetadata, indexLoadingConfig); + } else { + if (segmentDataManager instanceof RealtimeSegmentDataManager) { + _logger.info("Changing segment: {} from CONSUMING to ONLINE", segmentName); + ((RealtimeSegmentDataManager) segmentDataManager).goOnlineFromConsuming(zkMetadata); + onConsumingToOnline(segmentName); } else { - _logger.warn("Skipping adding existing segment: {} for table: {} with data manager class: {}", segmentName, - _tableNameWithType, segmentDataManager.getClass().getSimpleName()); + replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig); } + } + } + + @Override + public void addConsumingSegment(String segmentName) { + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot add CONSUMING segment: %s to table: %s", segmentName, + _tableNameWithType); + _logger.info("Adding CONSUMING segment: {}", segmentName); + Lock segmentLock = getSegmentLock(segmentName); + segmentLock.lock(); + try { + doAddConsumingSegment(segmentName); + } catch (Exception e) { + addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while adding CONSUMING segment", e)); + throw e; + } finally { + segmentLock.unlock(); + } + } + + private void doAddConsumingSegment(String segmentName) { + SegmentZKMetadata zkMetadata = getZKMetadata(segmentName); + // NOTE: We do not throw exception here because the segment might have just been committed before the state + // transition is processed. We can skip adding this segment, and rely on the following CONSUMING -> ONLINE + // state transition to add it. + if (zkMetadata.getStatus() != Status.IN_PROGRESS) { + _logger.warn("Segment: {} is already committed, skipping adding it as CONSUMING segment", segmentName); + return; + } + IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata); + handleUpsertPreload(zkMetadata, indexLoadingConfig); + SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); + if (segmentDataManager != null) { + _logger.warn("Segment: {} already exists, skipping adding it as CONSUMING segment", segmentName); Review Comment: nit: check if segmentDataManager is mutable/immutable segment and log it for debug ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java: ########## @@ -67,6 +67,8 @@ public interface PartitionUpsertMetadataManager extends Closeable { /** * Preload segments for the table partition. Segments can be added differently during preloading. + * TODO: Revisit this and see if we can use the same IndexLoadingConfig for all segments. Tier info might be different + * for different segments. Review Comment: good catch, I think we could fix it like below: ``` void doPreloadSegmentWithSnapshot(...) { // need to make a copy of indexLoadingConfig; setSegmentTier(segmentZKMetadata.getTier()); // pass the copy of indexLoadingConfig to method below tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, segmentZKMetadata); } ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment immutableSegment) { _logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); return; } - // Replacing segment takes multiple steps, and particularly need to access the oldSegment. Replace segment may - // happen in two threads, i.e. the consuming thread that's committing the mutable segment and a HelixTaskExecutor - // thread that's bringing segment from ONLINE to CONSUMING when the server finds consuming thread can't commit - // the segment in time. The slower thread takes the reference of the oldSegment here, but it may get closed by - // the faster thread if not synchronized. In particular, the slower thread may iterate the primary keys in the - // oldSegment, causing seg fault. So we have to take a lock here. - // However, we can't just reuse the existing segmentLocks. Because many methods of partitionUpsertMetadataManager - // takes this lock internally, but after taking snapshot RW lock. If we take segmentLock here (before taking - // snapshot RW lock), we can get into deadlock with threads calling partitionUpsertMetadataManager's other - // methods, like removeSegment. - // Adding segment should be done by a single HelixTaskExecutor thread, but do it with lock here for simplicity - // otherwise, we'd need to double-check if oldSegmentManager is null. - Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, segmentName); - segmentLock.lock(); - try { - SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); - if (oldSegmentManager == null) { - // When adding a new segment, we should register it 'before' it is fully initialized by - // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other - // segments may be invalidated, making the queries see less valid docs than expected. We should let query - // access the new segment asap even though its validDocId bitmap is still being filled by - // partitionUpsertMetadataManager. - registerSegment(segmentName, newSegmentManager); - partitionUpsertMetadataManager.addSegment(immutableSegment); - _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); - } else { - // When replacing a segment, we should register the new segment 'after' it is fully initialized by - // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access - // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still - // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. - // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data - // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. - IndexSegment oldSegment = oldSegmentManager.getSegment(); - partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); - registerSegment(segmentName, newSegmentManager); - _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", - oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); - releaseSegment(oldSegmentManager); - } - } finally { - segmentLock.unlock(); - } - } - - @Override - protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { - // Cannot download consuming segment - if (zkMetadata.getStatus() == Status.IN_PROGRESS) { - return false; - } - // TODO: may support download from peer servers as well. - return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl()); - } - - void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata segmentZKMetadata, - IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { - String uri = segmentZKMetadata.getDownloadUrl(); - if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) { - try { - // TODO: cleanup and consolidate the segment loading logic a bit for OFFLINE and REALTIME tables. - // https://github.com/apache/pinot/issues/9752 - downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri); - } catch (Exception e) { - _logger.warn("Download segment {} from deepstore uri {} failed.", segmentName, uri, e); - // Download from deep store failed; try to download from peer if peer download is setup for the table. - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw e; - } - } + SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); + if (oldSegmentManager == null) { + // When adding a new segment, we should register it 'before' it is fully initialized by + // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other + // segments may be invalidated, making the queries see less valid docs than expected. We should let query + // access the new segment asap even though its validDocId bitmap is still being filled by + // partitionUpsertMetadataManager. + registerSegment(segmentName, newSegmentManager); + partitionUpsertMetadataManager.addSegment(immutableSegment); + _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); } else { - if (_peerDownloadScheme != null) { - downloadSegmentFromPeer(segmentName, indexLoadingConfig); - } else { - throw new RuntimeException("Peer segment download not enabled for segment " + segmentName); - } - } - } - - private void downloadSegmentFromDeepStore(String segmentName, IndexLoadingConfig indexLoadingConfig, String uri) { - // This could leave temporary directories in _indexDir if JVM shuts down before the temp directory is deleted. - // This is fine since the temporary directories are deleted when the table data manager calls init. - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile); - _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Failed to download segment {} from deep store: ", segmentName, e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); + // When replacing a segment, we should register the new segment 'after' it is fully initialized by + // partitionUpsertMetadataManager to fill up its validDocId bitmap. Otherwise, the queries will lose the access + // to the valid docs in the old segment immediately, but the validDocId bitmap of the new segment is still + // being filled by partitionUpsertMetadataManager, making the queries see less valid docs than expected. + // When replacing a segment, the new and old segments are assumed to have same set of valid docs for data + // consistency, otherwise the new segment should be named differently to go through the addSegment flow above. + IndexSegment oldSegment = oldSegmentManager.getSegment(); + partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); + registerSegment(segmentName, newSegmentManager); + _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", + oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); + releaseSegment(oldSegmentManager); } } /** - * Untars the new segment and replaces the existing segment. + * Replaces the CONSUMING segment with a downloaded sealed one. */ - private void untarAndMoveSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, File segmentTarFile, - File tempRootDir) - throws IOException { - File untarDir = new File(tempRootDir, segmentName); - File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile, untarDir).get(0); - _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile, untarDir); - File indexDir = new File(_indexDir, segmentName); - FileUtils.deleteQuietly(indexDir); - FileUtils.moveDirectory(untaredSegDir, indexDir); - _logger.info("Replacing LLC Segment {}", segmentName); - replaceLLSegment(segmentName, indexLoadingConfig); - } - - private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig indexLoadingConfig) { - File tempRootDir = null; - try { - tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); - File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); - // Next download the segment from a randomly chosen server using configured download scheme (http or https). - SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName, () -> { - List<URI> peerServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, - _peerDownloadScheme); - Collections.shuffle(peerServerURIs); - return peerServerURIs; - }, segmentTarFile); - _logger.info("Fetched segment {} successfully to {} of size {}", segmentName, segmentTarFile, - segmentTarFile.length()); - untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); - } catch (Exception e) { - _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, _peerDownloadScheme, - e); - throw new RuntimeException(e); - } finally { - FileUtils.deleteQuietly(tempRootDir); - } + public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata) + throws Exception { + String segmentName = zkMetadata.getSegmentName(); + _logger.info("Downloading and replacing CONSUMING segment: {} with sealed one", segmentName); Review Comment: s/sealed/committed one to be a bit more precise? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org