This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 26709271f2 Move untar method to BaseTaskExecutor and untar with
peerDownload segment if deepstore corrupted (#13964)
26709271f2 is described below
commit 26709271f2be1e82eb02478dc0a1bdaa1dd67970
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Thu Sep 12 10:52:05 2024 +0530
Move untar method to BaseTaskExecutor and untar with peerDownload segment
if deepstore corrupted (#13964)
---
.../BaseMultipleSegmentsConversionExecutor.java | 19 +++++--------------
.../tasks/BaseSingleSegmentConversionExecutor.java | 19 ++++++-------------
.../pinot/plugin/minion/tasks/BaseTaskExecutor.java | 21 ++++++++++++++++++---
3 files changed, 29 insertions(+), 30 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index a00883f34e..f16d93b151 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -200,30 +200,21 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
for (int i = 0; i < downloadURLs.length; i++) {
String segmentName = segmentNames[i];
- // Download the segment file
+ // Download and decompress the segment file
_eventObserver.notifyProgress(_pinotTaskConfig,
- String.format("Downloading segment from: %s (%d out of %d)",
downloadURLs[i], (i + 1),
+ String.format("Downloading and decompressing segment from: %s (%d
out of %d)", downloadURLs[i], (i + 1),
downloadURLs.length));
- File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" +
i);
+ File indexDir;
try {
- downloadSegmentToLocal(tableNameWithType, segmentName,
downloadURLs[i], taskType, tarredSegmentFile);
+ indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURLs[i], taskType,
+ tempDataDir, "_" + i);
} catch (Exception e) {
LOGGER.error("Failed to download segment from download url: {}",
downloadURLs[i], e);
_minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
_eventObserver.notifyTaskError(_pinotTaskConfig, e);
throw e;
}
-
- // Un-tar the segment file
- _eventObserver.notifyProgress(_pinotTaskConfig,
- String.format("Decompressing segment from: %s (%d out of %d)",
downloadURLs[i], (i + 1),
- downloadURLs.length));
- File segmentDir = new File(tempDataDir, "segmentDir_" + i);
- File indexDir = TarCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
inputSegmentDirs.add(indexDir);
- if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
- LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
- }
reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
SegmentMetadataImpl segmentMetadata = new
SegmentMetadataImpl(indexDir);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index c75fa99bf8..e35a9c5cd5 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -97,12 +97,13 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(),
taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs(), "Failed to create temporary
directory: %s", tempDataDir);
try {
- // Download the tarred segment file
- _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading segment
from: " + downloadURL);
- File tarredSegmentFile = new File(tempDataDir, "tarredSegment");
- LOGGER.info("Downloading segment from {} to {}", downloadURL,
tarredSegmentFile.getAbsolutePath());
+ // Download and decompress the segment file
+ _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and
decompressing segment from: "
+ + downloadURL);
+ File indexDir;
try {
- downloadSegmentToLocal(tableNameWithType, segmentName, downloadURL,
taskType, tarredSegmentFile);
+ indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURL, taskType,
+ tempDataDir, "");
} catch (Exception e) {
LOGGER.error("Failed to download segment from download url: {}",
downloadURL, e);
_minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
@@ -110,14 +111,6 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
throw e;
}
- // Un-tar the segment file
- _eventObserver.notifyProgress(_pinotTaskConfig, "Decompressing segment
from: " + downloadURL);
- File segmentDir = new File(tempDataDir, "segmentDir");
- File indexDir = TarCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
- if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
- LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
- }
-
// Publish metrics related to segment download
reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index a33b2c4005..c61a5b58d9 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -29,6 +29,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -110,20 +111,27 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
_minionMetrics.addMeteredTableValue(tableName, taskType, meter, unitCount);
}
- protected void downloadSegmentToLocal(String tableNameWithType, String
segmentName, String deepstoreURL,
- String taskType, File tarredSegmentFile)
+ protected File downloadSegmentToLocalAndUntar(String tableNameWithType,
String segmentName, String deepstoreURL,
+ String taskType, File tempDataDir, String suffix)
throws Exception {
- LOGGER.info("Downloading segment {} from {} to {}", segmentName,
deepstoreURL, tarredSegmentFile.getAbsolutePath());
+ File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile" +
suffix);
+ File segmentDir = new File(tempDataDir, "segmentDir" + suffix);
+ File indexDir;
TableConfig tableConfig = getTableConfig(tableNameWithType);
String crypterName =
tableConfig.getValidationConfig().getCrypterClassName();
+ LOGGER.info("Downloading segment {} from {} to {}", segmentName,
deepstoreURL, tarredSegmentFile.getAbsolutePath());
+
try {
// download from deepstore first
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(deepstoreURL,
tarredSegmentFile, crypterName);
+ // untar the segment file
+ indexDir = TarCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
} catch (Exception e) {
LOGGER.error("Segment download failed from deepstore for {},
crypter:{}", deepstoreURL, crypterName, e);
String peerDownloadScheme =
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
taskType,
MINION_CONTEXT.isAllowDownloadFromServer()) && peerDownloadScheme !=
null) {
+ // if allowDownloadFromServer is enabled, download the segment from a
peer server as deepstore download failed
LOGGER.info("Trying to download from servers for segment {} post
deepstore download failed", segmentName);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName,
peerDownloadScheme, () -> {
List<URI> uris =
@@ -132,9 +140,16 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
Collections.shuffle(uris);
return uris;
}, tarredSegmentFile, crypterName);
+ // untar the segment file
+ indexDir = TarCompressionUtils.untar(tarredSegmentFile,
segmentDir).get(0);
} else {
throw e;
}
+ } finally {
+ if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred input segment: {}",
tarredSegmentFile.getAbsolutePath());
+ }
}
+ return indexDir;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]