chenboat commented on a change in pull request #5336:
URL: https://github.com/apache/incubator-pinot/pull/5336#discussion_r448548971



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -268,20 +273,81 @@ public void addSegment(String segmentName, TableConfig 
tableConfig, IndexLoading
   }
 
   public void downloadAndReplaceSegment(String segmentName, 
LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
-      IndexLoadingConfig indexLoadingConfig) {
+      IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
+    if (!"PEER".equalsIgnoreCase(uri)) {
+      try {
+        downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
+      } catch (Exception e) {
+        // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
+        if (isPeerSegmentDownloadEnabled(tableConfig)) {
+          downloadSegmentFromPeer(segmentName, 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), 
indexLoadingConfig);
+        } else {
+          throw e;
+        }
+      }
+    } else {
+      if (isPeerSegmentDownloadEnabled(tableConfig)) {
+        downloadSegmentFromPeer(segmentName, 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), 
indexLoadingConfig);
+      } else {
+        throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
+      }
+    }
+  }
+
+  private void downloadSegmentFromDeepStore(String segmentName, 
IndexLoadingConfig indexLoadingConfig, String uri) {
     File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + 
System.currentTimeMillis());
     File tempFile = new File(_indexDir, segmentName + ".tar.gz");
-    final File segmentFolder = new File(_indexDir, segmentName);
-    FileUtils.deleteQuietly(segmentFolder);
     try {
       SegmentFetcherFactory.fetchSegmentToLocal(uri, tempFile);
       _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, tempFile, tempFile.length());
-      TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, 
tempSegmentFolder);
-      FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
-      replaceLLSegment(segmentName, indexLoadingConfig);
+      untarAndMoveSegment(segmentName, indexLoadingConfig, tempSegmentFolder, 
tempFile);
+    } catch (Exception e) {
+      _logger.warn("Failed to download segment {} from deep store: ", 
segmentName, e);
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(tempFile);
+      FileUtils.deleteQuietly(tempSegmentFolder);
+    }
+  }
+
+  private void untarAndMoveSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, File tempSegmentFolder,
+      File tempFile)
+      throws IOException, ArchiveException {
+    TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
+    _logger.info("Uncompressed file {} into tmp dir {}", tempFile, 
tempSegmentFolder);
+    final File segmentFolder = new File(_indexDir, segmentName);
+    FileUtils.deleteQuietly(segmentFolder);
+    FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
+    _logger.info("Replacing LLC Segment {}", segmentName);
+    replaceLLSegment(segmentName, indexLoadingConfig);
+  }
+
+  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
+    return SegmentFetcherFactory.HTTP_PROTOCOL
+        
.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
+        || SegmentFetcherFactory.HTTPS_PROTOCOL
+        
.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+  }
+
+  private void downloadSegmentFromPeer(String segmentName, String 
downloadScheme, IndexLoadingConfig indexLoadingConfig) {
+    File tempSegmentFolder = new File(_indexDir, "tmp-" + segmentName + "." + 
System.currentTimeMillis());
+    File tempFile = new File(_indexDir, segmentName + ".tar.gz");
+    try {
+      RetryPolicies.exponentialBackoffRetryPolicy(RETRY_COUNT, RETRY_WAIT_MS, 
RETRY_DELAY_SCALE_FACTOR).attempt(() -> {

Review comment:
       done. thanks for the pointer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to