Jackie-Jiang commented on code in PR #12886:
URL: https://github.com/apache/pinot/pull/12886#discussion_r1577055850


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String 
segmentName) {
     }
   }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    return true;
-  }
-
-  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
-      throws Exception {
-    // TODO: may support download from peer servers for RealTime table.
-    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
-  }
-
-  private File downloadSegmentFromDeepStore(String segmentName, 
SegmentZKMetadata zkMetadata)
+  /**
+   * Downloads an immutable segment into the index directory.
+   * Segment can be downloaded from deep store or from peer servers. 
Downloaded segment might be compressed or
+   * encrypted, and this method takes care of decompressing and decrypting the 
segment.
+   */
+  protected File downloadSegment(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
-    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
-      try {
-        File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, 
zkMetadata, tempRootDir,
-            _streamSegmentDownloadUntarRateLimitBytesPerSec);
-        return moveSegment(segmentName, untaredSegDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    } else {
-      try {
-        File tarFile = downloadAndDecrypt(segmentName, zkMetadata, 
tempRootDir);
-        return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
-      } finally {
-        FileUtils.deleteQuietly(tempRootDir);
-      }
-    }
-  }
-
-  private File moveSegment(String segmentName, File untaredSegDir)
-      throws IOException {
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    Preconditions.checkState(downloadUrl != null,
+        "Failed to find download URL in ZK metadata for segment: %s of table: 
%s", segmentName, _tableNameWithType);
     try {
-      File indexDir = getSegmentDataDir(segmentName);
-      FileUtils.deleteDirectory(indexDir);
-      FileUtils.moveDirectory(untaredSegDir, indexDir);
-      return indexDir;
+      if 
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+        try {
+          return downloadSegmentFromDeepStore(zkMetadata);
+        } catch (Exception e) {
+          if (_peerDownloadScheme != null) {
+            return downloadSegmentFromPeers(zkMetadata);
+          } else {
+            throw e;
+          }
+        }
+      } else {
+        return downloadSegmentFromPeers(zkMetadata);
+      }
     } catch (Exception e) {
-      LOGGER.error("Failed to move segment: {} of table: {}", segmentName, 
_tableNameWithType);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1);
       throw e;
     }
   }
 
   @VisibleForTesting
-  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, 
File tempRootDir)
+  File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
       throws Exception {
-    File tarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    String uri = zkMetadata.getDownloadUrl();
-    boolean downloadSuccess = false;
+    String segmentName = zkMetadata.getSegmentName();
+    String downloadUrl = zkMetadata.getDownloadUrl();
+    _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl);
+    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
+    if (_segmentDownloadSemaphore != null) {
+      long startTime = System.currentTimeMillis();
+      _logger.info("Acquiring segment download semaphore for segment: {}, 
queue-length: {} ", segmentName,
+          _segmentDownloadSemaphore.getQueueLength());
+      _segmentDownloadSemaphore.acquire();
+      _logger.info("Acquired segment download semaphore for segment: {} 
(lock-time={}ms, queue-length={}).",
+          segmentName, System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
+    }
     try {
-      if (_segmentDownloadSemaphore != null) {
-        long startTime = System.currentTimeMillis();
-        LOGGER.info("Trying to acquire segment download semaphore for: {}. 
queue-length: {} ", segmentName,
-            _segmentDownloadSemaphore.getQueueLength());
-        _segmentDownloadSemaphore.acquire();
-        LOGGER.info("Acquired segment download semaphore for: {} 
(lock-time={}ms, queue-length={}).", segmentName,
-            System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
-      }
-      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, 
zkMetadata.getCrypterName());
-      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: 
{}, file length: {}", segmentName,
-          _tableNameWithType, uri, tarFile, tarFile.length());
-      downloadSuccess = true;
-      return tarFile;
-    } catch (AttemptsExceededException e) {
-      LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from: {} to: {}", segmentName,
-          _tableNameWithType, uri, tarFile);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
-      if (_peerDownloadScheme == null) {
-        throw e;
+      File untarredSegmentDir;
+      if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == 
null) {
+        _logger.info("Downloading segment: {} using streamed download-untar 
with maxStreamRateInByte: {}", segmentName,
+            _streamSegmentDownloadUntarRateLimitBytesPerSec);
+        AtomicInteger attempts = new AtomicInteger(0);
+        try {
+          untarredSegmentDir = 
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+              _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts);
+          _logger.info("Downloaded and untarred segment: {} from: {}, 
attempts: {}", segmentName, downloadUrl,
+              attempts.get());
+        } finally {
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+              attempts.get());
+        }
+      } else {
+        File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, 
segmentTarFile, zkMetadata.getCrypterName());
+        _logger.info("Downloaded tarred segment: {} from: {} to: {}, file 
length: {}", segmentName, downloadUrl,
+            segmentTarFile, segmentTarFile.length());
+        untarredSegmentDir = untarSegment(segmentName, segmentTarFile, 
tempRootDir);
       }
-      downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile);
-      downloadSuccess = true;
-      return tarFile;
+      File indexDir = moveSegment(segmentName, untarredSegmentDir);
+      _logger.info("Downloaded segment: {} from: {} to: {}", segmentName, 
downloadUrl, indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1);
+      throw e;
     } finally {
-      if (!downloadSuccess) {
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
-      }
       if (_segmentDownloadSemaphore != null) {
         _segmentDownloadSemaphore.release();
       }
+      FileUtils.deleteQuietly(tempRootDir);
     }
   }
 
-  protected void downloadFromPeersWithoutStreaming(String segmentName, 
SegmentZKMetadata zkMetadata, File destTarFile)
+  @VisibleForTesting
+  File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
       throws Exception {
+    String segmentName = zkMetadata.getSegmentName();
     Preconditions.checkState(_peerDownloadScheme != null, "Peer download is 
not enabled for table: %s",
         _tableNameWithType);
+    _logger.info("Downloading segment: {} from peers", segmentName);
+    File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());

Review Comment:
   Good point! Seems we no longer expose them in test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to